1use std::io;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::sync::atomic::AtomicBool;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::Stream;
33use futures::future::Either;
34use futures::{
35 StreamExt, TryStreamExt,
36 future::{self, BoxFuture},
37 stream::BoxStream,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult, get_etag};
41use log::warn;
42use object_store::PutOptions;
43use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
44use tracing::info;
45use url::Url;
46
47#[cfg(feature = "dynamodb")]
48pub mod dynamodb;
49pub mod external_manifest;
50
51use lance_core::{Error, Result};
52use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
53use lance_io::traits::{WriteExt, Writer};
54
55use crate::format::{IndexMetadata, Manifest, Transaction, is_detached_version};
56use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
57#[cfg(feature = "dynamodb")]
58use {
59 self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
60 aws_credential_types::provider::ProvideCredentials,
61 aws_credential_types::provider::error::CredentialsError,
62 lance_io::object_store::{StorageOptions, providers::aws::build_aws_credential},
63 object_store::aws::AmazonS3ConfigKey,
64 object_store::aws::AwsCredentialProvider,
65 std::borrow::Cow,
66 std::time::{Duration, SystemTime},
67};
68
69pub const VERSIONS_DIR: &str = "_versions";
70const MANIFEST_EXTENSION: &str = "manifest";
71const DETACHED_VERSION_PREFIX: &str = "d";
72
73#[derive(Clone, Copy, Debug, PartialEq, Eq)]
75pub enum ManifestNamingScheme {
76 V1,
78 V2,
82}
83
84impl ManifestNamingScheme {
85 pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
86 let directory = base.child(VERSIONS_DIR);
87 if is_detached_version(version) {
88 let directory = base.child(VERSIONS_DIR);
93 directory.child(format!(
94 "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
95 ))
96 } else {
97 match self {
98 Self::V1 => directory.child(format!("{version}.{MANIFEST_EXTENSION}")),
99 Self::V2 => {
100 let inverted_version = u64::MAX - version;
101 directory.child(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
102 }
103 }
104 }
105 }
106
107 pub fn parse_version(&self, filename: &str) -> Option<u64> {
108 let file_number = filename
109 .split_once('.')
110 .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
112 match self {
113 Self::V1 => file_number,
114 Self::V2 => file_number.map(|v| u64::MAX - v),
115 }
116 }
117
118 pub fn parse_detached_version(filename: &str) -> Option<u64> {
122 if !filename.starts_with(DETACHED_VERSION_PREFIX) {
123 return None;
124 }
125 let without_prefix = &filename[DETACHED_VERSION_PREFIX.len()..];
126 without_prefix
127 .split_once('.')
128 .and_then(|(version_str, _)| version_str.parse::<u64>().ok())
129 }
130
131 pub fn detect_scheme(filename: &str) -> Option<Self> {
132 if filename.starts_with(DETACHED_VERSION_PREFIX) {
133 return Some(Self::V2);
135 }
136 if filename.ends_with(MANIFEST_EXTENSION) {
137 const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
138 if filename.len() == V2_LEN {
139 Some(Self::V2)
140 } else {
141 Some(Self::V1)
142 }
143 } else {
144 None
145 }
146 }
147
148 pub fn detect_scheme_staging(filename: &str) -> Self {
149 if filename.chars().nth(20) == Some('.') {
152 Self::V2
153 } else {
154 Self::V1
155 }
156 }
157}
158
159pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
169 object_store
170 .inner
171 .list(Some(&dataset_base.child(VERSIONS_DIR)))
172 .try_filter(|res| {
173 let res = if let Some(filename) = res.location.filename() {
174 ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
175 } else {
176 false
177 };
178 future::ready(res)
179 })
180 .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
181 let filename = meta.location.filename().unwrap();
182 let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
183 let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
184 object_store.inner.rename(&meta.location, &path).await?;
185 Ok(())
186 })
187 .await?;
188
189 Ok(())
190}
191
192pub type ManifestWriter = for<'a> fn(
196 object_store: &'a ObjectStore,
197 manifest: &'a mut Manifest,
198 indices: Option<Vec<IndexMetadata>>,
199 path: &'a Path,
200 transaction: Option<Transaction>,
201) -> BoxFuture<'a, Result<WriteResult>>;
202
203pub fn write_manifest_file_to_path<'a>(
207 object_store: &'a ObjectStore,
208 manifest: &'a mut Manifest,
209 indices: Option<Vec<IndexMetadata>>,
210 path: &'a Path,
211 transaction: Option<Transaction>,
212) -> BoxFuture<'a, Result<WriteResult>> {
213 Box::pin(async move {
214 let mut object_writer = ObjectWriter::new(object_store, path).await?;
215 let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
216 object_writer
217 .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
218 .await?;
219 let res = Writer::shutdown(&mut object_writer).await?;
220 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
221 Ok(res)
222 })
223}
224
225#[derive(Debug, Clone)]
226pub struct ManifestLocation {
227 pub version: u64,
229 pub path: Path,
231 pub size: Option<u64>,
233 pub naming_scheme: ManifestNamingScheme,
235 pub e_tag: Option<String>,
239}
240
241impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
242 type Error = Error;
243
244 fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
245 let filename = meta.location.filename().ok_or_else(|| {
246 Error::internal("ObjectMeta location does not have a filename".to_string())
247 })?;
248 let scheme = ManifestNamingScheme::detect_scheme(filename)
249 .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
250 let version = scheme
251 .parse_version(filename)
252 .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
253 Ok(Self {
254 version,
255 path: meta.location,
256 size: Some(meta.size),
257 naming_scheme: scheme,
258 e_tag: meta.e_tag,
259 })
260 }
261}
262
263async fn current_manifest_path(
265 object_store: &ObjectStore,
266 base: &Path,
267) -> Result<ManifestLocation> {
268 if object_store.is_local()
269 && let Ok(Some(location)) = current_manifest_local(base)
270 {
271 return Ok(location);
272 }
273
274 let manifest_files = object_store.list(Some(base.child(VERSIONS_DIR)));
275
276 let mut valid_manifests = manifest_files.try_filter_map(|res| {
277 let filename = res.location.filename().unwrap();
278 if let Some(scheme) = ManifestNamingScheme::detect_scheme(filename) {
279 if scheme.parse_version(filename).is_some() {
281 future::ready(Ok(Some((scheme, res))))
282 } else {
283 future::ready(Ok(None))
284 }
285 } else {
286 future::ready(Ok(None))
287 }
288 });
289
290 let first = valid_manifests.next().await.transpose()?;
291 match (first, object_store.list_is_lexically_ordered) {
292 (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
295 let version = scheme
296 .parse_version(meta.location.filename().unwrap())
297 .unwrap();
298
299 for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
303 if scheme != ManifestNamingScheme::V2 {
304 warn!(
305 "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
306 to migrate the directory."
307 );
308 break;
309 }
310 let next_version = scheme
311 .parse_version(meta.location.filename().unwrap())
312 .unwrap();
313 if next_version >= version {
314 warn!(
315 "List operation was expected to be lexically ordered, but was not. This \
316 could mean a corrupt read. Please make a bug report on the lance-format/lance \
317 GitHub repository."
318 );
319 break;
320 }
321 }
322
323 Ok(ManifestLocation {
324 version,
325 path: meta.location,
326 size: Some(meta.size),
327 naming_scheme: scheme,
328 e_tag: meta.e_tag,
329 })
330 }
331 (Some((first_scheme, meta)), _) => {
334 let mut current_version = first_scheme
335 .parse_version(meta.location.filename().unwrap())
336 .unwrap();
337 let mut current_meta = meta;
338 let scheme = first_scheme;
339
340 while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
341 if entry_scheme != scheme {
342 return Err(Error::internal(format!(
343 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
344 Use `migrate_manifest_paths_v2` to migrate the directory.",
345 scheme, entry_scheme
346 )));
347 }
348 let version = entry_scheme
349 .parse_version(meta.location.filename().unwrap())
350 .unwrap();
351 if version > current_version {
352 current_version = version;
353 current_meta = meta;
354 }
355 }
356 Ok(ManifestLocation {
357 version: current_version,
358 path: current_meta.location,
359 size: Some(current_meta.size),
360 naming_scheme: scheme,
361 e_tag: current_meta.e_tag,
362 })
363 }
364 (None, _) => Err(Error::not_found(base.child(VERSIONS_DIR).to_string())),
365 }
366}
367
368fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
372 let path = lance_io::local::to_local_path(&base.child(VERSIONS_DIR));
373 let entries = std::fs::read_dir(path)?;
374
375 let mut latest_entry: Option<(u64, DirEntry)> = None;
376
377 let mut scheme: Option<ManifestNamingScheme> = None;
378
379 for entry in entries {
380 let entry = entry?;
381 let filename_raw = entry.file_name();
382 let filename = filename_raw.to_string_lossy();
383
384 let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
385 continue;
388 };
389
390 if let Some(scheme) = scheme {
391 if scheme != entry_scheme {
392 return Err(io::Error::new(
393 io::ErrorKind::InvalidData,
394 format!(
395 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
396 scheme, entry_scheme
397 ),
398 ));
399 }
400 } else {
401 scheme = Some(entry_scheme);
402 }
403
404 let Some(version) = entry_scheme.parse_version(&filename) else {
405 continue;
406 };
407
408 if let Some((latest_version, _)) = &latest_entry {
409 if version > *latest_version {
410 latest_entry = Some((version, entry));
411 }
412 } else {
413 latest_entry = Some((version, entry));
414 }
415 }
416
417 if let Some((version, entry)) = latest_entry {
418 let path = Path::from_filesystem_path(entry.path())
419 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
420 let metadata = entry.metadata()?;
421 Ok(Some(ManifestLocation {
422 version,
423 path,
424 size: Some(metadata.len()),
425 naming_scheme: scheme.unwrap(),
426 e_tag: Some(get_etag(&metadata)),
427 }))
428 } else {
429 Ok(None)
430 }
431}
432
433fn list_manifests<'a>(
434 base_path: &Path,
435 object_store: &'a dyn OSObjectStore,
436) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
437 object_store
438 .read_dir_all(&base_path.child(VERSIONS_DIR), None)
439 .filter_map(|obj_meta| {
440 futures::future::ready(
441 obj_meta
442 .map(|m| ManifestLocation::try_from(m).ok())
443 .transpose(),
444 )
445 })
446 .boxed()
447}
448
449fn detached_manifest_location_from_meta(
451 meta: object_store::ObjectMeta,
452) -> Option<ManifestLocation> {
453 let filename = meta.location.filename()?;
454 let version = ManifestNamingScheme::parse_detached_version(filename)?;
455 Some(ManifestLocation {
456 version,
457 path: meta.location,
458 size: Some(meta.size),
459 naming_scheme: ManifestNamingScheme::V2,
460 e_tag: meta.e_tag,
461 })
462}
463
464pub fn list_detached_manifests<'a>(
466 base_path: &Path,
467 object_store: &'a dyn OSObjectStore,
468) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
469 object_store
470 .read_dir_all(&base_path.child(VERSIONS_DIR), None)
471 .filter_map(|obj_meta| {
472 futures::future::ready(
473 obj_meta
474 .map(detached_manifest_location_from_meta)
475 .transpose(),
476 )
477 })
478 .boxed()
479}
480
481fn make_staging_manifest_path(base: &Path) -> Result<Path> {
482 let id = uuid::Uuid::new_v4().to_string();
483 Path::parse(format!("{base}-{id}")).map_err(|e| Error::io_source(Box::new(e)))
484}
485
486#[cfg(feature = "dynamodb")]
487const DDB_URL_QUERY_KEY: &str = "ddbTableName";
488
489#[async_trait::async_trait]
498#[allow(clippy::too_many_arguments)]
499pub trait CommitHandler: Debug + Send + Sync {
500 async fn resolve_latest_location(
501 &self,
502 base_path: &Path,
503 object_store: &ObjectStore,
504 ) -> Result<ManifestLocation> {
505 Ok(current_manifest_path(object_store, base_path).await?)
506 }
507
508 async fn resolve_version_location(
509 &self,
510 base_path: &Path,
511 version: u64,
512 object_store: &dyn OSObjectStore,
513 ) -> Result<ManifestLocation> {
514 default_resolve_version(base_path, version, object_store).await
515 }
516
517 fn list_detached_manifest_locations<'a>(
521 &self,
522 base_path: &Path,
523 object_store: &'a ObjectStore,
524 ) -> BoxStream<'a, Result<ManifestLocation>> {
525 list_detached_manifests(base_path, &object_store.inner).boxed()
526 }
527
528 fn list_manifest_locations<'a>(
535 &self,
536 base_path: &Path,
537 object_store: &'a ObjectStore,
538 sorted_descending: bool,
539 ) -> BoxStream<'a, Result<ManifestLocation>> {
540 let underlying_stream = list_manifests(base_path, &object_store.inner);
541
542 if !sorted_descending {
543 return underlying_stream.boxed();
544 }
545
546 async fn sort_stream(
547 input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
548 ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
549 let mut locations = input_stream.try_collect::<Vec<_>>().await?;
550 locations.sort_by_key(|m| std::cmp::Reverse(m.version));
551 Ok(futures::stream::iter(locations.into_iter().map(Ok)))
552 }
553
554 if object_store.list_is_lexically_ordered {
557 let mut peekable = underlying_stream.peekable();
559
560 futures::stream::once(async move {
561 let naming_scheme = match Pin::new(&mut peekable).peek().await {
562 Some(Ok(m)) => m.naming_scheme,
563 Some(Err(_)) => ManifestNamingScheme::V2,
566 None => ManifestNamingScheme::V2,
567 };
568
569 if naming_scheme == ManifestNamingScheme::V2 {
570 Ok(Either::Left(peekable))
572 } else {
573 sort_stream(peekable).await.map(Either::Right)
574 }
575 })
576 .try_flatten()
577 .boxed()
578 } else {
579 futures::stream::once(sort_stream(underlying_stream))
584 .try_flatten()
585 .boxed()
586 }
587 }
588
589 async fn commit(
594 &self,
595 manifest: &mut Manifest,
596 indices: Option<Vec<IndexMetadata>>,
597 base_path: &Path,
598 object_store: &ObjectStore,
599 manifest_writer: ManifestWriter,
600 naming_scheme: ManifestNamingScheme,
601 transaction: Option<Transaction>,
602 ) -> std::result::Result<ManifestLocation, CommitError>;
603
604 async fn delete(&self, _base_path: &Path) -> Result<()> {
606 Ok(())
607 }
608}
609
610async fn default_resolve_version(
611 base_path: &Path,
612 version: u64,
613 object_store: &dyn OSObjectStore,
614) -> Result<ManifestLocation> {
615 if is_detached_version(version) {
616 return Ok(ManifestLocation {
617 version,
618 naming_scheme: ManifestNamingScheme::V2,
621 path: ManifestNamingScheme::V2.manifest_path(base_path, version),
623 size: None,
624 e_tag: None,
625 });
626 }
627
628 let scheme = ManifestNamingScheme::V2;
630 let path = scheme.manifest_path(base_path, version);
631 match object_store.head(&path).await {
632 Ok(meta) => Ok(ManifestLocation {
633 version,
634 path,
635 size: Some(meta.size),
636 naming_scheme: scheme,
637 e_tag: meta.e_tag,
638 }),
639 Err(ObjectStoreError::NotFound { .. }) => {
640 let scheme = ManifestNamingScheme::V1;
642 Ok(ManifestLocation {
643 version,
644 path: scheme.manifest_path(base_path, version),
645 size: None,
646 naming_scheme: scheme,
647 e_tag: None,
648 })
649 }
650 Err(e) => Err(e.into()),
651 }
652}
653#[cfg(feature = "dynamodb")]
655#[derive(Debug)]
656struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
657
658#[cfg(feature = "dynamodb")]
659impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
660 fn provide_credentials<'a>(
661 &'a self,
662 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
663 where
664 Self: 'a,
665 {
666 aws_credential_types::provider::future::ProvideCredentials::new(async {
667 let creds = self
668 .0
669 .get_credential()
670 .await
671 .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
672 Ok(aws_credential_types::Credentials::new(
673 &creds.key_id,
674 &creds.secret_key,
675 creds.token.clone(),
676 Some(
677 SystemTime::now()
678 .checked_add(Duration::from_secs(
679 60 * 10, ))
681 .expect("overflow"),
682 ),
683 "",
684 ))
685 })
686 }
687}
688
689#[cfg(feature = "dynamodb")]
690async fn build_dynamodb_external_store(
691 table_name: &str,
692 creds: AwsCredentialProvider,
693 region: &str,
694 endpoint: Option<String>,
695 app_name: &str,
696) -> Result<Arc<dyn ExternalManifestStore>> {
697 use super::commit::dynamodb::DynamoDBExternalManifestStore;
698 use aws_sdk_dynamodb::{
699 Client,
700 config::{IdentityCache, Region, retry::RetryConfig},
701 };
702
703 let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
704 .behavior_version_latest()
705 .region(Some(Region::new(region.to_string())))
706 .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
707 .identity_cache(IdentityCache::no_cache())
709 .retry_config(RetryConfig::standard().with_max_attempts(5));
712
713 if let Some(endpoint) = endpoint {
714 dynamodb_config = dynamodb_config.endpoint_url(endpoint);
715 }
716 let client = Client::from_conf(dynamodb_config.build());
717
718 DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
719}
720
721pub async fn commit_handler_from_url(
722 url_or_path: &str,
723 #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
725) -> Result<Arc<dyn CommitHandler>> {
726 let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
727 Arc::new(RenameCommitHandler)
728 } else {
729 Arc::new(ConditionalPutCommitHandler)
730 };
731
732 let url = match Url::parse(url_or_path) {
733 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
734 return Ok(local_handler);
736 }
737 Ok(url) => url,
738 Err(_) => {
739 return Ok(local_handler);
740 }
741 };
742
743 match url.scheme() {
744 "file" | "file-object-store" => Ok(local_handler),
745 "s3" | "gs" | "az" | "abfss" | "memory" | "oss" | "cos" => {
746 Ok(Arc::new(ConditionalPutCommitHandler))
747 }
748 #[cfg(not(feature = "dynamodb"))]
749 "s3+ddb" => Err(Error::invalid_input_source(
750 "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
751 )),
752 #[cfg(feature = "dynamodb")]
753 "s3+ddb" => {
754 if url.query_pairs().count() != 1 {
755 return Err(Error::invalid_input_source(
756 "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
757 ));
758 }
759 let table_name = match url.query_pairs().next() {
760 Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
761 if key == DDB_URL_QUERY_KEY =>
762 {
763 if table_name.is_empty() {
764 return Err(Error::invalid_input_source(
765 "`s3+ddb://` scheme requires non empty dynamodb table name".into(),
766 ));
767 }
768 table_name
769 }
770 _ => {
771 return Err(Error::invalid_input_source(
772 "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
773 ));
774 }
775 };
776 let options = options.clone().unwrap_or_default();
777 let storage_options_raw =
778 StorageOptions(options.storage_options().cloned().unwrap_or_default());
779 let dynamo_endpoint = get_dynamodb_endpoint(&storage_options_raw);
780 let storage_options = storage_options_raw.as_s3_options();
781
782 let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
783
784 let accessor = options.get_accessor();
786
787 let (aws_creds, region) = build_aws_credential(
788 options.s3_credentials_refresh_offset,
789 options.aws_credentials.clone(),
790 Some(&storage_options),
791 region,
792 accessor,
793 )
794 .await?;
795
796 Ok(Arc::new(ExternalManifestCommitHandler {
797 external_manifest_store: build_dynamodb_external_store(
798 table_name,
799 aws_creds.clone(),
800 ®ion,
801 dynamo_endpoint,
802 "lancedb",
803 )
804 .await?,
805 }))
806 }
807 _ => Ok(Arc::new(UnsafeCommitHandler)),
808 }
809}
810
811#[cfg(feature = "dynamodb")]
812fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
813 if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
814 Some(endpoint.clone())
815 } else {
816 std::env::var("DYNAMODB_ENDPOINT").ok()
817 }
818}
819
820#[derive(Debug)]
822pub enum CommitError {
823 CommitConflict,
825 OtherError(Error),
827}
828
829impl From<Error> for CommitError {
830 fn from(e: Error) -> Self {
831 Self::OtherError(e)
832 }
833}
834
835impl From<CommitError> for Error {
836 fn from(e: CommitError) -> Self {
837 match e {
838 CommitError::CommitConflict => Self::internal("Commit conflict".to_string()),
839 CommitError::OtherError(e) => e,
840 }
841 }
842}
843
844static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
846
847pub struct UnsafeCommitHandler;
851
852#[async_trait::async_trait]
853#[allow(clippy::too_many_arguments)]
854impl CommitHandler for UnsafeCommitHandler {
855 async fn commit(
856 &self,
857 manifest: &mut Manifest,
858 indices: Option<Vec<IndexMetadata>>,
859 base_path: &Path,
860 object_store: &ObjectStore,
861 manifest_writer: ManifestWriter,
862 naming_scheme: ManifestNamingScheme,
863 transaction: Option<Transaction>,
864 ) -> std::result::Result<ManifestLocation, CommitError> {
865 if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
867 WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
868 log::warn!(
869 "Using unsafe commit handler. Concurrent writes may result in data loss. \
870 Consider providing a commit handler that prevents conflicting writes."
871 );
872 }
873
874 let version_path = naming_scheme.manifest_path(base_path, manifest.version);
875 let res =
876 manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
877
878 Ok(ManifestLocation {
879 version: manifest.version,
880 size: Some(res.size as u64),
881 naming_scheme,
882 path: version_path,
883 e_tag: res.e_tag,
884 })
885 }
886}
887
888impl Debug for UnsafeCommitHandler {
889 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
890 f.debug_struct("UnsafeCommitHandler").finish()
891 }
892}
893
894#[async_trait::async_trait]
896pub trait CommitLock: Debug {
897 type Lease: CommitLease;
898
899 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
912}
913
914#[async_trait::async_trait]
915pub trait CommitLease: Send + Sync {
916 async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
918}
919
920#[async_trait::async_trait]
921impl<T: CommitLock + Send + Sync> CommitHandler for T {
922 async fn commit(
923 &self,
924 manifest: &mut Manifest,
925 indices: Option<Vec<IndexMetadata>>,
926 base_path: &Path,
927 object_store: &ObjectStore,
928 manifest_writer: ManifestWriter,
929 naming_scheme: ManifestNamingScheme,
930 transaction: Option<Transaction>,
931 ) -> std::result::Result<ManifestLocation, CommitError> {
932 let path = naming_scheme.manifest_path(base_path, manifest.version);
933 let lease = self.lock(manifest.version).await?;
936
937 match object_store.inner.head(&path).await {
939 Ok(_) => {
940 lease.release(false).await?;
943
944 return Err(CommitError::CommitConflict);
945 }
946 Err(ObjectStoreError::NotFound { .. }) => {}
947 Err(e) => {
948 lease.release(false).await?;
951
952 return Err(CommitError::OtherError(e.into()));
953 }
954 }
955 let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
956
957 lease.release(res.is_ok()).await?;
959
960 let res = res?;
961 Ok(ManifestLocation {
962 version: manifest.version,
963 size: Some(res.size as u64),
964 naming_scheme,
965 path,
966 e_tag: res.e_tag,
967 })
968 }
969}
970
971#[async_trait::async_trait]
972impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
973 async fn commit(
974 &self,
975 manifest: &mut Manifest,
976 indices: Option<Vec<IndexMetadata>>,
977 base_path: &Path,
978 object_store: &ObjectStore,
979 manifest_writer: ManifestWriter,
980 naming_scheme: ManifestNamingScheme,
981 transaction: Option<Transaction>,
982 ) -> std::result::Result<ManifestLocation, CommitError> {
983 self.as_ref()
984 .commit(
985 manifest,
986 indices,
987 base_path,
988 object_store,
989 manifest_writer,
990 naming_scheme,
991 transaction,
992 )
993 .await
994 }
995}
996
997pub struct RenameCommitHandler;
1001
1002#[async_trait::async_trait]
1003impl CommitHandler for RenameCommitHandler {
1004 async fn commit(
1005 &self,
1006 manifest: &mut Manifest,
1007 indices: Option<Vec<IndexMetadata>>,
1008 base_path: &Path,
1009 object_store: &ObjectStore,
1010 manifest_writer: ManifestWriter,
1011 naming_scheme: ManifestNamingScheme,
1012 transaction: Option<Transaction>,
1013 ) -> std::result::Result<ManifestLocation, CommitError> {
1014 let path = naming_scheme.manifest_path(base_path, manifest.version);
1018 let tmp_path = make_staging_manifest_path(&path)?;
1019
1020 let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1021
1022 match object_store
1023 .inner
1024 .rename_if_not_exists(&tmp_path, &path)
1025 .await
1026 {
1027 Ok(_) => {
1028 Ok(ManifestLocation {
1030 version: manifest.version,
1031 path,
1032 size: Some(res.size as u64),
1033 naming_scheme,
1034 e_tag: None, })
1036 }
1037 Err(ObjectStoreError::AlreadyExists { .. }) => {
1038 let _ = object_store.delete(&tmp_path).await;
1041
1042 return Err(CommitError::CommitConflict);
1043 }
1044 Err(e) => {
1045 return Err(CommitError::OtherError(e.into()));
1047 }
1048 }
1049 }
1050}
1051
1052impl Debug for RenameCommitHandler {
1053 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1054 f.debug_struct("RenameCommitHandler").finish()
1055 }
1056}
1057
1058pub struct ConditionalPutCommitHandler;
1059
1060#[async_trait::async_trait]
1061impl CommitHandler for ConditionalPutCommitHandler {
1062 async fn commit(
1063 &self,
1064 manifest: &mut Manifest,
1065 indices: Option<Vec<IndexMetadata>>,
1066 base_path: &Path,
1067 object_store: &ObjectStore,
1068 manifest_writer: ManifestWriter,
1069 naming_scheme: ManifestNamingScheme,
1070 transaction: Option<Transaction>,
1071 ) -> std::result::Result<ManifestLocation, CommitError> {
1072 let path = naming_scheme.manifest_path(base_path, manifest.version);
1073
1074 let memory_store = ObjectStore::memory();
1075 let dummy_path = "dummy";
1076 manifest_writer(
1077 &memory_store,
1078 manifest,
1079 indices,
1080 &dummy_path.into(),
1081 transaction,
1082 )
1083 .await?;
1084 let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1085 let size = dummy_data.len() as u64;
1086 let res = object_store
1087 .inner
1088 .put_opts(
1089 &path,
1090 dummy_data.into(),
1091 PutOptions {
1092 mode: object_store::PutMode::Create,
1093 ..Default::default()
1094 },
1095 )
1096 .await
1097 .map_err(|err| match err {
1098 ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1099 CommitError::CommitConflict
1100 }
1101 _ => CommitError::OtherError(err.into()),
1102 })?;
1103
1104 Ok(ManifestLocation {
1105 version: manifest.version,
1106 path,
1107 size: Some(size),
1108 naming_scheme,
1109 e_tag: res.e_tag,
1110 })
1111 }
1112}
1113
1114impl Debug for ConditionalPutCommitHandler {
1115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1116 f.debug_struct("ConditionalPutCommitHandler").finish()
1117 }
1118}
1119
1120#[derive(Debug, Clone)]
1121pub struct CommitConfig {
1122 pub num_retries: u32,
1123 pub skip_auto_cleanup: bool,
1124 }
1126
1127impl Default for CommitConfig {
1128 fn default() -> Self {
1129 Self {
1130 num_retries: 20,
1131 skip_auto_cleanup: false,
1132 }
1133 }
1134}
1135
1136#[cfg(test)]
1137mod tests {
1138 use lance_core::utils::tempfile::TempObjDir;
1139
1140 use super::*;
1141
1142 #[test]
1143 fn test_manifest_naming_scheme() {
1144 let v1 = ManifestNamingScheme::V1;
1145 let v2 = ManifestNamingScheme::V2;
1146
1147 assert_eq!(
1148 v1.manifest_path(&Path::from("base"), 0),
1149 Path::from("base/_versions/0.manifest")
1150 );
1151 assert_eq!(
1152 v1.manifest_path(&Path::from("base"), 42),
1153 Path::from("base/_versions/42.manifest")
1154 );
1155
1156 assert_eq!(
1157 v2.manifest_path(&Path::from("base"), 0),
1158 Path::from("base/_versions/18446744073709551615.manifest")
1159 );
1160 assert_eq!(
1161 v2.manifest_path(&Path::from("base"), 42),
1162 Path::from("base/_versions/18446744073709551573.manifest")
1163 );
1164
1165 assert_eq!(v1.parse_version("0.manifest"), Some(0));
1166 assert_eq!(v1.parse_version("42.manifest"), Some(42));
1167 assert_eq!(
1168 v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1169 Some(42)
1170 );
1171
1172 assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1173 assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1174 assert_eq!(
1175 v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1176 Some(42)
1177 );
1178
1179 assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1180 assert_eq!(
1181 ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1182 Some(v2)
1183 );
1184 assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1185 }
1186
1187 #[tokio::test]
1188 async fn test_manifest_naming_migration() {
1189 let object_store = ObjectStore::memory();
1190 let base = Path::from("base");
1191 let versions_dir = base.child(VERSIONS_DIR);
1192
1193 let original_files = vec![
1195 versions_dir.child("irrelevant"),
1196 ManifestNamingScheme::V1.manifest_path(&base, 0),
1197 ManifestNamingScheme::V2.manifest_path(&base, 1),
1198 ];
1199 for path in original_files {
1200 object_store.put(&path, b"".as_slice()).await.unwrap();
1201 }
1202
1203 migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1204
1205 let expected_files = vec![
1206 ManifestNamingScheme::V2.manifest_path(&base, 1),
1207 ManifestNamingScheme::V2.manifest_path(&base, 0),
1208 versions_dir.child("irrelevant"),
1209 ];
1210 let actual_files = object_store
1211 .inner
1212 .list(Some(&versions_dir))
1213 .map_ok(|res| res.location)
1214 .try_collect::<Vec<_>>()
1215 .await
1216 .unwrap();
1217 assert_eq!(actual_files, expected_files);
1218 }
1219
1220 #[tokio::test]
1221 #[rstest::rstest]
1222 async fn test_list_manifests_sorted(
1223 #[values(true, false)] lexical_list_store: bool,
1224 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1225 naming_scheme: ManifestNamingScheme,
1226 ) {
1227 let tempdir;
1228 let (object_store, base) = if lexical_list_store {
1229 (Box::new(ObjectStore::memory()), Path::from("base"))
1230 } else {
1231 tempdir = TempObjDir::default();
1232 let path = tempdir.child("base");
1233 let store = Box::new(ObjectStore::local());
1234 assert!(!store.list_is_lexically_ordered);
1235 (store, path)
1236 };
1237
1238 let mut expected_paths = Vec::new();
1240 for i in (0..12).rev() {
1241 let path = naming_scheme.manifest_path(&base, i);
1242 object_store.put(&path, b"".as_slice()).await.unwrap();
1243 expected_paths.push(path);
1244 }
1245
1246 let actual_versions = ConditionalPutCommitHandler
1247 .list_manifest_locations(&base, &object_store, true)
1248 .map_ok(|location| location.path)
1249 .try_collect::<Vec<_>>()
1250 .await
1251 .unwrap();
1252
1253 assert_eq!(actual_versions, expected_paths);
1254 }
1255
1256 #[tokio::test]
1257 #[rstest::rstest]
1258 async fn test_current_manifest_path(
1259 #[values(true, false)] lexical_list_store: bool,
1260 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1261 naming_scheme: ManifestNamingScheme,
1262 ) {
1263 let mut object_store = ObjectStore::memory();
1266 object_store.list_is_lexically_ordered = lexical_list_store;
1267 let object_store = Box::new(object_store);
1268 let base = Path::from("base");
1269
1270 for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1272 let path = naming_scheme.manifest_path(&base, version);
1273 object_store.put(&path, b"".as_slice()).await.unwrap();
1274 }
1275
1276 let location = current_manifest_path(&object_store, &base).await.unwrap();
1277
1278 assert_eq!(location.version, 11);
1279 assert_eq!(location.naming_scheme, naming_scheme);
1280 assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1281 }
1282
1283 #[test]
1284 fn test_parse_detached_version() {
1285 assert_eq!(
1287 ManifestNamingScheme::parse_detached_version("d12345.manifest"),
1288 Some(12345)
1289 );
1290 assert_eq!(
1291 ManifestNamingScheme::parse_detached_version("d9223372036854775808.manifest"),
1292 Some(9223372036854775808)
1293 );
1294
1295 assert_eq!(
1297 ManifestNamingScheme::parse_detached_version("12345.manifest"),
1298 None
1299 );
1300
1301 assert_eq!(
1303 ManifestNamingScheme::parse_detached_version("18446744073709551615.manifest"),
1304 None
1305 );
1306
1307 assert_eq!(ManifestNamingScheme::parse_detached_version("d12345"), None);
1309 }
1310
1311 #[tokio::test]
1312 async fn test_list_detached_manifests() {
1313 use crate::format::DETACHED_VERSION_MASK;
1314 use futures::TryStreamExt;
1315
1316 let object_store = ObjectStore::memory();
1317 let base = Path::from("base");
1318 let versions_dir = base.child(VERSIONS_DIR);
1319
1320 for version in [1, 2, 3] {
1322 let path = ManifestNamingScheme::V2.manifest_path(&base, version);
1323 object_store.put(&path, b"".as_slice()).await.unwrap();
1324 }
1325
1326 let detached_versions: Vec<u64> = vec![
1328 100 | DETACHED_VERSION_MASK,
1329 200 | DETACHED_VERSION_MASK,
1330 300 | DETACHED_VERSION_MASK,
1331 ];
1332 for version in &detached_versions {
1333 let path = versions_dir.child(format!("d{}.manifest", version));
1334 object_store.put(&path, b"".as_slice()).await.unwrap();
1335 }
1336
1337 let detached_locations: Vec<ManifestLocation> =
1339 list_detached_manifests(&base, &object_store.inner)
1340 .try_collect()
1341 .await
1342 .unwrap();
1343
1344 assert_eq!(detached_locations.len(), 3);
1345 for loc in &detached_locations {
1346 assert_eq!(loc.naming_scheme, ManifestNamingScheme::V2);
1347 }
1348
1349 let mut found_versions: Vec<u64> = detached_locations.iter().map(|l| l.version).collect();
1350 found_versions.sort();
1351 let mut expected_versions = detached_versions.clone();
1352 expected_versions.sort();
1353 assert_eq!(found_versions, expected_versions);
1354 }
1355}