1use std::io;
26use std::pin::Pin;
27use std::sync::atomic::AtomicBool;
28use std::sync::Arc;
29use std::{fmt::Debug, fs::DirEntry};
30
31use futures::future::Either;
32use futures::Stream;
33use futures::{
34 future::{self, BoxFuture},
35 stream::BoxStream,
36 StreamExt, TryStreamExt,
37};
38use lance_io::object_writer::WriteResult;
39use log::warn;
40use object_store::PutOptions;
41use object_store::{path::Path, Error as ObjectStoreError, ObjectStore as OSObjectStore};
42use snafu::location;
43use url::Url;
44
45#[cfg(feature = "dynamodb")]
46pub mod dynamodb;
47pub mod external_manifest;
48
49use lance_core::{Error, Result};
50use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
51
52#[cfg(feature = "dynamodb")]
53use {
54 self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
55 aws_credential_types::provider::error::CredentialsError,
56 aws_credential_types::provider::ProvideCredentials,
57 lance_io::object_store::{providers::aws::build_aws_credential, StorageOptions},
58 object_store::aws::AmazonS3ConfigKey,
59 object_store::aws::AwsCredentialProvider,
60 std::borrow::Cow,
61 std::time::{Duration, SystemTime},
62};
63
64use crate::format::{is_detached_version, IndexMetadata, Manifest};
65
66const VERSIONS_DIR: &str = "_versions";
67const MANIFEST_EXTENSION: &str = "manifest";
68const DETACHED_VERSION_PREFIX: &str = "d";
69
70#[derive(Clone, Copy, Debug, PartialEq, Eq)]
72pub enum ManifestNamingScheme {
73 V1,
75 V2,
79}
80
81impl ManifestNamingScheme {
82 pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
83 let directory = base.child(VERSIONS_DIR);
84 if is_detached_version(version) {
85 let directory = base.child(VERSIONS_DIR);
90 directory.child(format!(
91 "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
92 ))
93 } else {
94 match self {
95 Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
96 Self::V2 => {
97 let inverted_version = u64::MAX - version;
98 directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
99 }
100 }
101 }
102 }
103
104 pub fn parse_version(&self, filename: &str) -> Option<u64> {
105 let file_number = filename
106 .split_once('.')
107 .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
109 match self {
110 Self::V1 => file_number,
111 Self::V2 => file_number.map(|v| u64::MAX - v),
112 }
113 }
114
115 pub fn detect_scheme(filename: &str) -> Option<Self> {
116 if filename.starts_with(DETACHED_VERSION_PREFIX) {
117 return Some(Self::V2);
119 }
120 if filename.ends_with(MANIFEST_EXTENSION) {
121 const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
122 if filename.len() == V2_LEN {
123 Some(Self::V2)
124 } else {
125 Some(Self::V1)
126 }
127 } else {
128 None
129 }
130 }
131
132 pub fn detect_scheme_staging(filename: &str) -> Self {
133 if filename.chars().nth(20) == Some('.') {
136 Self::V2
137 } else {
138 Self::V1
139 }
140 }
141}
142
143pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
153 object_store
154 .inner
155 .list(Some(&dataset_base.child(VERSIONS_DIR)))
156 .try_filter(|res| {
157 let res = if let Some(filename) = res.location.filename() {
158 ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
159 } else {
160 false
161 };
162 future::ready(res)
163 })
164 .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
165 let filename = meta.location.filename().unwrap();
166 let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
167 let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
168 object_store.inner.rename(&meta.location, &path).await?;
169 Ok(())
170 })
171 .await?;
172
173 Ok(())
174}
175
176pub type ManifestWriter = for<'a> fn(
180 object_store: &'a ObjectStore,
181 manifest: &'a mut Manifest,
182 indices: Option<Vec<IndexMetadata>>,
183 path: &'a Path,
184) -> BoxFuture<'a, Result<WriteResult>>;
185
186#[derive(Debug, Clone)]
187pub struct ManifestLocation {
188 pub version: u64,
190 pub path: Path,
192 pub size: Option<u64>,
194 pub naming_scheme: ManifestNamingScheme,
196 pub e_tag: Option<String>,
200}
201
202impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
203 type Error = Error;
204
205 fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
206 let filename = meta.location.filename().ok_or_else(|| Error::Internal {
207 message: "ObjectMeta location does not have a filename".to_string(),
208 location: location!(),
209 })?;
210 let scheme =
211 ManifestNamingScheme::detect_scheme(filename).ok_or_else(|| Error::Internal {
212 message: format!("Invalid manifest filename: '{}'", filename),
213 location: location!(),
214 })?;
215 let version = scheme
216 .parse_version(filename)
217 .ok_or_else(|| Error::Internal {
218 message: format!("Invalid manifest filename: '{}'", filename),
219 location: location!(),
220 })?;
221 Ok(Self {
222 version,
223 path: meta.location,
224 size: Some(meta.size),
225 naming_scheme: scheme,
226 e_tag: meta.e_tag,
227 })
228 }
229}
230
231async fn current_manifest_path(
233 object_store: &ObjectStore,
234 base: &Path,
235) -> Result<ManifestLocation> {
236 if object_store.is_local() {
237 if let Ok(Some(location)) = current_manifest_local(base) {
238 return Ok(location);
239 }
240 }
241
242 let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
243
244 let mut valid_manifests = manifest_files.try_filter_map(|res| {
245 if let Some(scheme) = ManifestNamingScheme::detect_scheme(res.location.filename().unwrap())
246 {
247 future::ready(Ok(Some((scheme, res))))
248 } else {
249 future::ready(Ok(None))
250 }
251 });
252
253 let first = valid_manifests.next().await.transpose()?;
254 match (first, object_store.list_is_lexically_ordered) {
255 (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
258 let version = scheme
259 .parse_version(meta.location.filename().unwrap())
260 .unwrap();
261
262 for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
266 if scheme != ManifestNamingScheme::V2 {
267 warn!(
268 "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
269 to migrate the directory."
270 );
271 break;
272 }
273 let next_version = scheme
274 .parse_version(meta.location.filename().unwrap())
275 .unwrap();
276 if next_version >= version {
277 warn!(
278 "List operation was expected to be lexically ordered, but was not. This \
279 could mean a corrupt read. Please make a bug report on the lancedb/lance \
280 GitHub repository."
281 );
282 break;
283 }
284 }
285
286 Ok(ManifestLocation {
287 version,
288 path: meta.location,
289 size: Some(meta.size),
290 naming_scheme: scheme,
291 e_tag: meta.e_tag,
292 })
293 }
294 (Some((scheme, meta)), _) => {
298 let mut current_version = scheme
299 .parse_version(meta.location.filename().unwrap())
300 .unwrap();
301 let mut current_meta = meta;
302
303 while let Some((scheme, meta)) = valid_manifests.next().await.transpose()? {
304 if matches!(scheme, ManifestNamingScheme::V2) {
305 return Err(Error::Internal {
306 message: "Found V2 manifest in a V1 manifest directory".to_string(),
307 location: location!(),
308 });
309 }
310 let version = scheme
311 .parse_version(meta.location.filename().unwrap())
312 .unwrap();
313 if version > current_version {
314 current_version = version;
315 current_meta = meta;
316 }
317 }
318 Ok(ManifestLocation {
319 version: current_version,
320 path: current_meta.location,
321 size: Some(current_meta.size),
322 naming_scheme: scheme,
323 e_tag: current_meta.e_tag,
324 })
325 }
326 (None, _) => Err(Error::NotFound {
327 uri: base.child(VERSIONS_DIR).to_string(),
328 location: location!(),
329 }),
330 }
331}
332
333fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
337 let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
338 let entries = std::fs::read_dir(path)?;
339
340 let mut latest_entry: Option<(u64, DirEntry)> = None;
341
342 let mut scheme: Option<ManifestNamingScheme> = None;
343
344 for entry in entries {
345 let entry = entry?;
346 let filename_raw = entry.file_name();
347 let filename = filename_raw.to_string_lossy();
348
349 let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
350 continue;
353 };
354
355 if let Some(scheme) = scheme {
356 if scheme != entry_scheme {
357 return Err(io::Error::new(
358 io::ErrorKind::InvalidData,
359 format!(
360 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
361 scheme, entry_scheme
362 ),
363 ));
364 }
365 } else {
366 scheme = Some(entry_scheme);
367 }
368
369 let Some(version) = entry_scheme.parse_version(&filename) else {
370 continue;
371 };
372
373 if let Some((latest_version, _)) = &latest_entry {
374 if version > *latest_version {
375 latest_entry = Some((version, entry));
376 }
377 } else {
378 latest_entry = Some((version, entry));
379 }
380 }
381
382 if let Some((version, entry)) = latest_entry {
383 let path = Path::from_filesystem_path(entry.path())
384 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
385 let metadata = entry.metadata()?;
386 Ok(Some(ManifestLocation {
387 version,
388 path,
389 size: Some(metadata.len()),
390 naming_scheme: scheme.unwrap(),
391 e_tag: Some(get_etag(&metadata)),
392 }))
393 } else {
394 Ok(None)
395 }
396}
397
398fn get_etag(metadata: &std::fs::Metadata) -> String {
400 let inode = get_inode(metadata);
401 let size = metadata.len();
402 let mtime = metadata
403 .modified()
404 .ok()
405 .and_then(|mtime| mtime.duration_since(std::time::SystemTime::UNIX_EPOCH).ok())
406 .unwrap_or_default()
407 .as_micros();
408
409 format!("{inode:x}-{mtime:x}-{size:x}")
413}
414
415#[cfg(unix)]
416fn get_inode(metadata: &std::fs::Metadata) -> u64 {
419 std::os::unix::fs::MetadataExt::ino(metadata)
420}
421
422#[cfg(not(unix))]
423fn get_inode(_metadata: &std::fs::Metadata) -> u64 {
425 0
426}
427
428fn list_manifests<'a>(
429 base_path: &Path,
430 object_store: &'a dyn OSObjectStore,
431) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
432 object_store
433 .read_dir_all(&base_path.child(VERSIONS_DIR), None)
434 .filter_map(|obj_meta| {
435 futures::future::ready(
436 obj_meta
437 .map(|m| ManifestLocation::try_from(m).ok())
438 .transpose(),
439 )
440 })
441 .boxed()
442}
443
444fn make_staging_manifest_path(base: &Path) -> Result<Path> {
445 let id = uuid::Uuid::new_v4().to_string();
446 Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
447 source: Box::new(e),
448 location: location!(),
449 })
450}
451
452#[cfg(feature = "dynamodb")]
453const DDB_URL_QUERY_KEY: &str = "ddbTableName";
454
455#[async_trait::async_trait]
464pub trait CommitHandler: Debug + Send + Sync {
465 async fn resolve_latest_location(
466 &self,
467 base_path: &Path,
468 object_store: &ObjectStore,
469 ) -> Result<ManifestLocation> {
470 Ok(current_manifest_path(object_store, base_path).await?)
471 }
472
473 async fn resolve_version_location(
474 &self,
475 base_path: &Path,
476 version: u64,
477 object_store: &dyn OSObjectStore,
478 ) -> Result<ManifestLocation> {
479 default_resolve_version(base_path, version, object_store).await
480 }
481
482 fn list_manifest_locations<'a>(
489 &self,
490 base_path: &Path,
491 object_store: &'a ObjectStore,
492 sorted_descending: bool,
493 ) -> BoxStream<'a, Result<ManifestLocation>> {
494 let underlying_stream = list_manifests(base_path, &object_store.inner);
495
496 if !sorted_descending {
497 return underlying_stream.boxed();
498 }
499
500 async fn sort_stream(
501 input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
502 ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
503 let mut locations = input_stream.try_collect::<Vec<_>>().await?;
504 locations.sort_by_key(|m| std::cmp::Reverse(m.version));
505 Ok(futures::stream::iter(locations.into_iter().map(Ok)))
506 }
507
508 if object_store.list_is_lexically_ordered {
511 let mut peekable = underlying_stream.peekable();
513
514 futures::stream::once(async move {
515 let naming_scheme = match Pin::new(&mut peekable).peek().await {
516 Some(Ok(m)) => m.naming_scheme,
517 Some(Err(_)) => ManifestNamingScheme::V2,
520 None => ManifestNamingScheme::V2,
521 };
522
523 if naming_scheme == ManifestNamingScheme::V2 {
524 Ok(Either::Left(peekable))
526 } else {
527 sort_stream(peekable).await.map(Either::Right)
528 }
529 })
530 .try_flatten()
531 .boxed()
532 } else {
533 futures::stream::once(sort_stream(underlying_stream))
538 .try_flatten()
539 .boxed()
540 }
541 }
542
543 async fn commit(
548 &self,
549 manifest: &mut Manifest,
550 indices: Option<Vec<IndexMetadata>>,
551 base_path: &Path,
552 object_store: &ObjectStore,
553 manifest_writer: ManifestWriter,
554 naming_scheme: ManifestNamingScheme,
555 ) -> std::result::Result<ManifestLocation, CommitError>;
556
557 async fn delete(&self, _base_path: &Path) -> Result<()> {
559 Ok(())
560 }
561}
562
563async fn default_resolve_version(
564 base_path: &Path,
565 version: u64,
566 object_store: &dyn OSObjectStore,
567) -> Result<ManifestLocation> {
568 if is_detached_version(version) {
569 return Ok(ManifestLocation {
570 version,
571 naming_scheme: ManifestNamingScheme::V2,
574 path: ManifestNamingScheme::V2.manifest_path(base_path, version),
576 size: None,
577 e_tag: None,
578 });
579 }
580
581 let scheme = ManifestNamingScheme::V2;
583 let path = scheme.manifest_path(base_path, version);
584 match object_store.head(&path).await {
585 Ok(meta) => Ok(ManifestLocation {
586 version,
587 path,
588 size: Some(meta.size),
589 naming_scheme: scheme,
590 e_tag: meta.e_tag,
591 }),
592 Err(ObjectStoreError::NotFound { .. }) => {
593 let scheme = ManifestNamingScheme::V1;
595 Ok(ManifestLocation {
596 version,
597 path: scheme.manifest_path(base_path, version),
598 size: None,
599 naming_scheme: scheme,
600 e_tag: None,
601 })
602 }
603 Err(e) => Err(e.into()),
604 }
605}
606#[cfg(feature = "dynamodb")]
608#[derive(Debug)]
609struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
610
611#[cfg(feature = "dynamodb")]
612impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
613 fn provide_credentials<'a>(
614 &'a self,
615 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
616 where
617 Self: 'a,
618 {
619 aws_credential_types::provider::future::ProvideCredentials::new(async {
620 let creds = self
621 .0
622 .get_credential()
623 .await
624 .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
625 Ok(aws_credential_types::Credentials::new(
626 &creds.key_id,
627 &creds.secret_key,
628 creds.token.clone(),
629 Some(
630 SystemTime::now()
631 .checked_add(Duration::from_secs(
632 60 * 10, ))
634 .expect("overflow"),
635 ),
636 "",
637 ))
638 })
639 }
640}
641
642#[cfg(feature = "dynamodb")]
643async fn build_dynamodb_external_store(
644 table_name: &str,
645 creds: AwsCredentialProvider,
646 region: &str,
647 endpoint: Option<String>,
648 app_name: &str,
649) -> Result<Arc<dyn ExternalManifestStore>> {
650 use super::commit::dynamodb::DynamoDBExternalManifestStore;
651 use aws_sdk_dynamodb::{
652 config::{IdentityCache, Region},
653 Client,
654 };
655
656 let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
657 .behavior_version_latest()
658 .region(Some(Region::new(region.to_string())))
659 .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
660 .identity_cache(IdentityCache::no_cache());
662
663 if let Some(endpoint) = endpoint {
664 dynamodb_config = dynamodb_config.endpoint_url(endpoint);
665 }
666 let client = Client::from_conf(dynamodb_config.build());
667
668 DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
669}
670
671pub async fn commit_handler_from_url(
672 url_or_path: &str,
673 #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
675) -> Result<Arc<dyn CommitHandler>> {
676 let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
677 Arc::new(RenameCommitHandler)
678 } else {
679 Arc::new(ConditionalPutCommitHandler)
680 };
681
682 let url = match Url::parse(url_or_path) {
683 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
684 return Ok(local_handler);
686 }
687 Ok(url) => url,
688 Err(_) => {
689 return Ok(local_handler);
690 }
691 };
692
693 match url.scheme() {
694 "file" | "file-object-store" => Ok(local_handler),
695 "s3" | "gs" | "az" | "memory" => Ok(Arc::new(ConditionalPutCommitHandler)),
696 #[cfg(not(feature = "dynamodb"))]
697 "s3+ddb" => Err(Error::InvalidInput {
698 source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
699 location: location!(),
700 }),
701 #[cfg(feature = "dynamodb")]
702 "s3+ddb" => {
703 if url.query_pairs().count() != 1 {
704 return Err(Error::InvalidInput {
705 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
706 .into(),
707 location: location!(),
708 });
709 }
710 let table_name = match url.query_pairs().next() {
711 Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
712 if key == DDB_URL_QUERY_KEY =>
713 {
714 if table_name.is_empty() {
715 return Err(Error::InvalidInput {
716 source: "`s3+ddb://` scheme requires non empty dynamodb table name"
717 .into(),
718 location: location!(),
719 });
720 }
721 table_name
722 }
723 _ => {
724 return Err(Error::InvalidInput {
725 source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
726 .into(),
727 location: location!(),
728 });
729 }
730 };
731 let options = options.clone().unwrap_or_default();
732 let storage_options = StorageOptions(options.storage_options.unwrap_or_default());
733 let dynamo_endpoint = get_dynamodb_endpoint(&storage_options);
734 let expires_at_millis = storage_options.expires_at_millis();
735 let storage_options = storage_options.as_s3_options();
736
737 let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
738
739 let (aws_creds, region) = build_aws_credential(
740 options.s3_credentials_refresh_offset,
741 options.aws_credentials.clone(),
742 Some(&storage_options),
743 region,
744 options.storage_options_provider.clone(),
745 expires_at_millis,
746 )
747 .await?;
748
749 Ok(Arc::new(ExternalManifestCommitHandler {
750 external_manifest_store: build_dynamodb_external_store(
751 table_name,
752 aws_creds.clone(),
753 ®ion,
754 dynamo_endpoint,
755 "lancedb",
756 )
757 .await?,
758 }))
759 }
760 _ => Ok(Arc::new(UnsafeCommitHandler)),
761 }
762}
763
764#[cfg(feature = "dynamodb")]
765fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
766 if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
767 Some(endpoint.clone())
768 } else {
769 std::env::var("DYNAMODB_ENDPOINT").ok()
770 }
771}
772
773#[derive(Debug)]
775pub enum CommitError {
776 CommitConflict,
778 OtherError(Error),
780}
781
782impl From<Error> for CommitError {
783 fn from(e: Error) -> Self {
784 Self::OtherError(e)
785 }
786}
787
788impl From<CommitError> for Error {
789 fn from(e: CommitError) -> Self {
790 match e {
791 CommitError::CommitConflict => Self::Internal {
792 message: "Commit conflict".to_string(),
793 location: location!(),
794 },
795 CommitError::OtherError(e) => e,
796 }
797 }
798}
799
800static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
802
803pub struct UnsafeCommitHandler;
807
808#[async_trait::async_trait]
809impl CommitHandler for UnsafeCommitHandler {
810 async fn commit(
811 &self,
812 manifest: &mut Manifest,
813 indices: Option<Vec<IndexMetadata>>,
814 base_path: &Path,
815 object_store: &ObjectStore,
816 manifest_writer: ManifestWriter,
817 naming_scheme: ManifestNamingScheme,
818 ) -> std::result::Result<ManifestLocation, CommitError> {
819 if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
821 WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
822 log::warn!(
823 "Using unsafe commit handler. Concurrent writes may result in data loss. \
824 Consider providing a commit handler that prevents conflicting writes."
825 );
826 }
827
828 let version_path = naming_scheme.manifest_path(base_path, manifest.version);
829 let res = manifest_writer(object_store, manifest, indices, &version_path).await?;
831
832 Ok(ManifestLocation {
833 version: manifest.version,
834 size: Some(res.size as u64),
835 naming_scheme,
836 path: version_path,
837 e_tag: res.e_tag,
838 })
839 }
840}
841
842impl Debug for UnsafeCommitHandler {
843 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
844 f.debug_struct("UnsafeCommitHandler").finish()
845 }
846}
847
848#[async_trait::async_trait]
850pub trait CommitLock: Debug {
851 type Lease: CommitLease;
852
853 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
866}
867
868#[async_trait::async_trait]
869pub trait CommitLease: Send + Sync {
870 async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
872}
873
874#[async_trait::async_trait]
875impl<T: CommitLock + Send + Sync> CommitHandler for T {
876 async fn commit(
877 &self,
878 manifest: &mut Manifest,
879 indices: Option<Vec<IndexMetadata>>,
880 base_path: &Path,
881 object_store: &ObjectStore,
882 manifest_writer: ManifestWriter,
883 naming_scheme: ManifestNamingScheme,
884 ) -> std::result::Result<ManifestLocation, CommitError> {
885 let path = naming_scheme.manifest_path(base_path, manifest.version);
886 let lease = self.lock(manifest.version).await?;
889
890 match object_store.inner.head(&path).await {
892 Ok(_) => {
893 lease.release(false).await?;
896
897 return Err(CommitError::CommitConflict);
898 }
899 Err(ObjectStoreError::NotFound { .. }) => {}
900 Err(e) => {
901 lease.release(false).await?;
904
905 return Err(CommitError::OtherError(e.into()));
906 }
907 }
908 let res = manifest_writer(object_store, manifest, indices, &path).await;
909
910 lease.release(res.is_ok()).await?;
912
913 let res = res?;
914 Ok(ManifestLocation {
915 version: manifest.version,
916 size: Some(res.size as u64),
917 naming_scheme,
918 path,
919 e_tag: res.e_tag,
920 })
921 }
922}
923
924#[async_trait::async_trait]
925impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
926 async fn commit(
927 &self,
928 manifest: &mut Manifest,
929 indices: Option<Vec<IndexMetadata>>,
930 base_path: &Path,
931 object_store: &ObjectStore,
932 manifest_writer: ManifestWriter,
933 naming_scheme: ManifestNamingScheme,
934 ) -> std::result::Result<ManifestLocation, CommitError> {
935 self.as_ref()
936 .commit(
937 manifest,
938 indices,
939 base_path,
940 object_store,
941 manifest_writer,
942 naming_scheme,
943 )
944 .await
945 }
946}
947
948pub struct RenameCommitHandler;
952
953#[async_trait::async_trait]
954impl CommitHandler for RenameCommitHandler {
955 async fn commit(
956 &self,
957 manifest: &mut Manifest,
958 indices: Option<Vec<IndexMetadata>>,
959 base_path: &Path,
960 object_store: &ObjectStore,
961 manifest_writer: ManifestWriter,
962 naming_scheme: ManifestNamingScheme,
963 ) -> std::result::Result<ManifestLocation, CommitError> {
964 let path = naming_scheme.manifest_path(base_path, manifest.version);
968 let tmp_path = make_staging_manifest_path(&path)?;
969
970 let res = manifest_writer(object_store, manifest, indices, &tmp_path).await?;
972
973 match object_store
974 .inner
975 .rename_if_not_exists(&tmp_path, &path)
976 .await
977 {
978 Ok(_) => {
979 Ok(ManifestLocation {
981 version: manifest.version,
982 path,
983 size: Some(res.size as u64),
984 naming_scheme,
985 e_tag: None, })
987 }
988 Err(ObjectStoreError::AlreadyExists { .. }) => {
989 let _ = object_store.delete(&tmp_path).await;
992
993 return Err(CommitError::CommitConflict);
994 }
995 Err(e) => {
996 return Err(CommitError::OtherError(e.into()));
998 }
999 }
1000 }
1001}
1002
1003impl Debug for RenameCommitHandler {
1004 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1005 f.debug_struct("RenameCommitHandler").finish()
1006 }
1007}
1008
1009pub struct ConditionalPutCommitHandler;
1010
1011#[async_trait::async_trait]
1012impl CommitHandler for ConditionalPutCommitHandler {
1013 async fn commit(
1014 &self,
1015 manifest: &mut Manifest,
1016 indices: Option<Vec<IndexMetadata>>,
1017 base_path: &Path,
1018 object_store: &ObjectStore,
1019 manifest_writer: ManifestWriter,
1020 naming_scheme: ManifestNamingScheme,
1021 ) -> std::result::Result<ManifestLocation, CommitError> {
1022 let path = naming_scheme.manifest_path(base_path, manifest.version);
1023
1024 let memory_store = ObjectStore::memory();
1025 let dummy_path = "dummy";
1026 manifest_writer(&memory_store, manifest, indices, &dummy_path.into()).await?;
1027 let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1028 let size = dummy_data.len() as u64;
1029 let res = object_store
1030 .inner
1031 .put_opts(
1032 &path,
1033 dummy_data.into(),
1034 PutOptions {
1035 mode: object_store::PutMode::Create,
1036 ..Default::default()
1037 },
1038 )
1039 .await
1040 .map_err(|err| match err {
1041 ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1042 CommitError::CommitConflict
1043 }
1044 _ => CommitError::OtherError(err.into()),
1045 })?;
1046
1047 Ok(ManifestLocation {
1048 version: manifest.version,
1049 path,
1050 size: Some(size),
1051 naming_scheme,
1052 e_tag: res.e_tag,
1053 })
1054 }
1055}
1056
1057impl Debug for ConditionalPutCommitHandler {
1058 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1059 f.debug_struct("ConditionalPutCommitHandler").finish()
1060 }
1061}
1062
1063#[derive(Debug, Clone)]
1064pub struct CommitConfig {
1065 pub num_retries: u32,
1066 pub skip_auto_cleanup: bool,
1067 }
1069
1070impl Default for CommitConfig {
1071 fn default() -> Self {
1072 Self {
1073 num_retries: 20,
1074 skip_auto_cleanup: false,
1075 }
1076 }
1077}
1078
1079#[cfg(test)]
1080mod tests {
1081 use lance_core::utils::tempfile::TempObjDir;
1082
1083 use super::*;
1084
1085 #[test]
1086 fn test_manifest_naming_scheme() {
1087 let v1 = ManifestNamingScheme::V1;
1088 let v2 = ManifestNamingScheme::V2;
1089
1090 assert_eq!(
1091 v1.manifest_path(&Path::from("base"), 0),
1092 Path::from("base/_versions/0.manifest")
1093 );
1094 assert_eq!(
1095 v1.manifest_path(&Path::from("base"), 42),
1096 Path::from("base/_versions/42.manifest")
1097 );
1098
1099 assert_eq!(
1100 v2.manifest_path(&Path::from("base"), 0),
1101 Path::from("base/_versions/18446744073709551615.manifest")
1102 );
1103 assert_eq!(
1104 v2.manifest_path(&Path::from("base"), 42),
1105 Path::from("base/_versions/18446744073709551573.manifest")
1106 );
1107
1108 assert_eq!(v1.parse_version("0.manifest"), Some(0));
1109 assert_eq!(v1.parse_version("42.manifest"), Some(42));
1110 assert_eq!(
1111 v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1112 Some(42)
1113 );
1114
1115 assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1116 assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1117 assert_eq!(
1118 v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1119 Some(42)
1120 );
1121
1122 assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1123 assert_eq!(
1124 ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1125 Some(v2)
1126 );
1127 assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1128 }
1129
1130 #[tokio::test]
1131 async fn test_manifest_naming_migration() {
1132 let object_store = ObjectStore::memory();
1133 let base = Path::from("base");
1134 let versions_dir = base.child(VERSIONS_DIR);
1135
1136 let original_files = vec![
1138 versions_dir.child("irrelevant"),
1139 ManifestNamingScheme::V1.manifest_path(&base, 0),
1140 ManifestNamingScheme::V2.manifest_path(&base, 1),
1141 ];
1142 for path in original_files {
1143 object_store.put(&path, b"".as_slice()).await.unwrap();
1144 }
1145
1146 migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1147
1148 let expected_files = vec![
1149 ManifestNamingScheme::V2.manifest_path(&base, 1),
1150 ManifestNamingScheme::V2.manifest_path(&base, 0),
1151 versions_dir.child("irrelevant"),
1152 ];
1153 let actual_files = object_store
1154 .inner
1155 .list(Some(&versions_dir))
1156 .map_ok(|res| res.location)
1157 .try_collect::<Vec<_>>()
1158 .await
1159 .unwrap();
1160 assert_eq!(actual_files, expected_files);
1161 }
1162
1163 #[tokio::test]
1164 #[rstest::rstest]
1165 async fn test_list_manifests_sorted(
1166 #[values(true, false)] lexical_list_store: bool,
1167 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1168 naming_scheme: ManifestNamingScheme,
1169 ) {
1170 let tempdir;
1171 let (object_store, base) = if lexical_list_store {
1172 (Box::new(ObjectStore::memory()), Path::from("base"))
1173 } else {
1174 tempdir = TempObjDir::default();
1175 let path = tempdir.child("base");
1176 let store = Box::new(ObjectStore::local());
1177 assert!(!store.list_is_lexically_ordered);
1178 (store, path)
1179 };
1180
1181 let mut expected_paths = Vec::new();
1183 for i in (0..12).rev() {
1184 let path = naming_scheme.manifest_path(&base, i);
1185 object_store.put(&path, b"".as_slice()).await.unwrap();
1186 expected_paths.push(path);
1187 }
1188
1189 let actual_versions = ConditionalPutCommitHandler
1190 .list_manifest_locations(&base, &object_store, true)
1191 .map_ok(|location| location.path)
1192 .try_collect::<Vec<_>>()
1193 .await
1194 .unwrap();
1195
1196 assert_eq!(actual_versions, expected_paths);
1197 }
1198}