1use std::io;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::sync::atomic::AtomicBool;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::Stream;
33use futures::future::Either;
34use futures::{
35 StreamExt, TryStreamExt,
36 future::{self, BoxFuture},
37 stream::BoxStream,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult, get_etag};
41use log::warn;
42use object_store::ObjectStoreExt as OSObjectStoreExt;
43use object_store::PutOptions;
44use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
45use tracing::info;
46use url::Url;
47
48#[cfg(feature = "dynamodb")]
49pub mod dynamodb;
50pub mod external_manifest;
51
52use lance_core::{Error, Result};
53use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
54use lance_io::traits::{WriteExt, Writer};
55
56use crate::format::{IndexMetadata, Manifest, Transaction, is_detached_version};
57use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
58#[cfg(feature = "dynamodb")]
59use {
60 self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
61 aws_credential_types::provider::ProvideCredentials,
62 aws_credential_types::provider::error::CredentialsError,
63 lance_io::object_store::{StorageOptions, providers::aws::build_aws_credential},
64 object_store::aws::AmazonS3ConfigKey,
65 object_store::aws::AwsCredentialProvider,
66 std::borrow::Cow,
67 std::time::{Duration, SystemTime},
68};
69
70pub const VERSIONS_DIR: &str = "_versions";
71const MANIFEST_EXTENSION: &str = "manifest";
72const DETACHED_VERSION_PREFIX: &str = "d";
73const VERSION_HINT_FILE: &str = "latest_version_hint.json";
80
81#[derive(Clone, Copy, Debug, PartialEq, Eq)]
83pub enum ManifestNamingScheme {
84 V1,
86 V2,
90}
91
92impl ManifestNamingScheme {
93 pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
94 if is_detached_version(version) {
95 base.clone().join(VERSIONS_DIR).join(format!(
100 "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
101 ))
102 } else {
103 let directory = base.clone().join(VERSIONS_DIR);
104 match self {
105 Self::V1 => directory.join(format!("{version}.{MANIFEST_EXTENSION}")),
106 Self::V2 => {
107 let inverted_version = u64::MAX - version;
108 directory.join(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
109 }
110 }
111 }
112 }
113
114 pub fn parse_version(&self, filename: &str) -> Option<u64> {
115 let file_number = filename
116 .split_once('.')
117 .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
119 match self {
120 Self::V1 => file_number,
121 Self::V2 => file_number.map(|v| u64::MAX - v),
122 }
123 }
124
125 pub fn parse_detached_version(filename: &str) -> Option<u64> {
129 if !filename.starts_with(DETACHED_VERSION_PREFIX) {
130 return None;
131 }
132 let without_prefix = &filename[DETACHED_VERSION_PREFIX.len()..];
133 without_prefix
134 .split_once('.')
135 .and_then(|(version_str, _)| version_str.parse::<u64>().ok())
136 }
137
138 pub fn detect_scheme(filename: &str) -> Option<Self> {
139 if filename.starts_with(DETACHED_VERSION_PREFIX) {
140 return Some(Self::V2);
142 }
143 if filename.ends_with(MANIFEST_EXTENSION) {
144 const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
145 if filename.len() == V2_LEN {
146 Some(Self::V2)
147 } else {
148 Some(Self::V1)
149 }
150 } else {
151 None
152 }
153 }
154
155 pub fn detect_scheme_staging(filename: &str) -> Self {
156 if filename.chars().nth(20) == Some('.') {
159 Self::V2
160 } else {
161 Self::V1
162 }
163 }
164}
165
166pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
176 object_store
177 .inner
178 .list(Some(&dataset_base.clone().join(VERSIONS_DIR)))
179 .try_filter(|res| {
180 let res = if let Some(filename) = res.location.filename() {
181 ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
182 } else {
183 false
184 };
185 future::ready(res)
186 })
187 .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
188 let filename = meta.location.filename().unwrap();
189 let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
190 let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
191 object_store.inner.rename(&meta.location, &path).await?;
192 Ok(())
193 })
194 .await?;
195
196 Ok(())
197}
198
199pub type ManifestWriter = for<'a> fn(
203 object_store: &'a ObjectStore,
204 manifest: &'a mut Manifest,
205 indices: Option<Vec<IndexMetadata>>,
206 path: &'a Path,
207 transaction: Option<Transaction>,
208) -> BoxFuture<'a, Result<WriteResult>>;
209
210pub fn write_manifest_file_to_path<'a>(
214 object_store: &'a ObjectStore,
215 manifest: &'a mut Manifest,
216 indices: Option<Vec<IndexMetadata>>,
217 path: &'a Path,
218 transaction: Option<Transaction>,
219) -> BoxFuture<'a, Result<WriteResult>> {
220 Box::pin(async move {
221 let mut object_writer = ObjectWriter::new(object_store, path).await?;
222 let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
223 object_writer
224 .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
225 .await?;
226 let res = Writer::shutdown(&mut object_writer).await?;
227 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
228 Ok(res)
229 })
230}
231
232#[derive(Debug, Clone)]
233pub struct ManifestLocation {
234 pub version: u64,
236 pub path: Path,
238 pub size: Option<u64>,
240 pub naming_scheme: ManifestNamingScheme,
242 pub e_tag: Option<String>,
246}
247
248impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
249 type Error = Error;
250
251 fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
252 let filename = meta.location.filename().ok_or_else(|| {
253 Error::internal("ObjectMeta location does not have a filename".to_string())
254 })?;
255 let scheme = ManifestNamingScheme::detect_scheme(filename)
256 .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
257 let version = scheme
258 .parse_version(filename)
259 .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
260 Ok(Self {
261 version,
262 path: meta.location,
263 size: Some(meta.size),
264 naming_scheme: scheme,
265 e_tag: meta.e_tag,
266 })
267 }
268}
269
270async fn current_manifest_path(
280 object_store: &ObjectStore,
281 base: &Path,
282) -> Result<ManifestLocation> {
283 if object_store.is_local() {
284 if let Ok(Some(location)) = current_manifest_local(base) {
285 return Ok(location);
286 }
287 } else if uses_version_hint(object_store)
288 && let Some(location) = read_version_hint_and_probe(object_store, base).await
289 {
290 return Ok(location);
291 }
292
293 resolve_version_from_listing(object_store, base).await
294}
295
296#[derive(serde::Serialize, serde::Deserialize)]
298struct VersionHint {
299 version: u64,
300}
301
302const VERSION_HINT_ENV: &str = "LANCE_USE_VERSION_HINT";
307
308fn version_hint_globally_enabled() -> bool {
309 static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
310 *ENABLED.get_or_init(|| match std::env::var(VERSION_HINT_ENV) {
311 Ok(v) => !matches!(
312 v.trim().to_ascii_lowercase().as_str(),
313 "0" | "false" | "off"
314 ),
315 Err(_) => true,
316 })
317}
318
319pub fn uses_version_hint(object_store: &ObjectStore) -> bool {
328 version_hint_globally_enabled() && !object_store.list_is_lexically_ordered
329}
330
331fn version_hint_path(base: &Path) -> Path {
333 base.clone().join(VERSIONS_DIR).join(VERSION_HINT_FILE)
334}
335
336pub async fn write_version_hint(object_store: &ObjectStore, base: &Path, version: u64) {
344 if is_detached_version(version) || !uses_version_hint(object_store) {
345 return;
346 }
347 let hint_path = version_hint_path(base);
348 let content = serde_json::to_vec(&VersionHint { version }).expect("serialize version hint");
349 if let Err(e) = object_store.put(&hint_path, content.as_slice()).await {
350 warn!("Failed to write version hint file for version {version}: {e}");
351 }
352}
353
354async fn read_version_from_hint(object_store: &ObjectStore, base: &Path) -> Option<u64> {
357 let bytes = object_store
358 .inner
359 .get(&version_hint_path(base))
360 .await
361 .ok()?
362 .bytes()
363 .await
364 .ok()?;
365 Some(serde_json::from_slice::<VersionHint>(&bytes).ok()?.version)
366}
367
368async fn read_version_hint_and_probe(
373 object_store: &ObjectStore,
374 base: &Path,
375) -> Option<ManifestLocation> {
376 let hint_version = read_version_from_hint(object_store, base).await?;
377 let (version, scheme, mut probed) = probe_versions_upward(object_store, base, hint_version)
378 .await
379 .ok()
380 .flatten()?;
381 let (_, meta) = probed.pop()?;
383 Some(ManifestLocation {
384 version,
385 path: scheme.manifest_path(base, version),
386 size: Some(meta.size),
387 naming_scheme: scheme,
388 e_tag: meta.e_tag,
389 })
390}
391
392const MAX_HINT_PROBE_GAP: u64 = 1000;
396
397async fn probe_versions_upward(
414 object_store: &ObjectStore,
415 base: &Path,
416 from_version: u64,
417) -> Result<
418 Option<(
419 u64,
420 ManifestNamingScheme,
421 Vec<(u64, object_store::ObjectMeta)>,
422 )>,
423> {
424 let mut scheme = ManifestNamingScheme::V2;
426 let meta = match object_store
427 .inner
428 .head(&scheme.manifest_path(base, from_version))
429 .await
430 {
431 Ok(meta) => meta,
432 Err(ObjectStoreError::NotFound { .. }) => {
433 scheme = ManifestNamingScheme::V1;
434 match object_store
435 .inner
436 .head(&scheme.manifest_path(base, from_version))
437 .await
438 {
439 Ok(meta) => meta,
440 Err(ObjectStoreError::NotFound { .. }) => return Ok(None),
441 Err(e) => return Err(e.into()),
442 }
443 }
444 Err(e) => return Err(e.into()),
445 };
446
447 let mut probed = vec![(from_version, meta)];
448 let mut version = from_version;
449 loop {
450 let next = version + 1;
451 match object_store
452 .inner
453 .head(&scheme.manifest_path(base, next))
454 .await
455 {
456 Ok(meta) => {
457 probed.push((next, meta));
458 version = next;
459 }
460 Err(ObjectStoreError::NotFound { .. }) => break,
462 Err(e) => return Err(e.into()),
465 }
466 }
467 Ok(Some((version, scheme, probed)))
468}
469
470async fn list_manifests_since_version_with_hint(
477 object_store: &ObjectStore,
478 base: &Path,
479 since_version: u64,
480) -> Option<Vec<ManifestLocation>> {
481 let hint_version = read_version_from_hint(object_store, base).await?;
482
483 if hint_version.saturating_sub(since_version) > MAX_HINT_PROBE_GAP {
486 return None;
487 }
488
489 let probe_from = if hint_version > since_version {
492 hint_version
493 } else {
494 since_version + 1
495 };
496
497 let (scheme, probed) = match probe_versions_upward(object_store, base, probe_from).await {
498 Ok(Some((_true_latest, scheme, probed))) => (scheme, probed),
499 Ok(None) if hint_version > since_version => return None,
503 Ok(None) => return Some(Vec::new()),
504 Err(_) => return None,
506 };
507
508 let mut locations: Vec<ManifestLocation> = probed
509 .into_iter()
510 .filter(|(v, _)| *v > since_version)
511 .map(|(version, meta)| ManifestLocation {
512 version,
513 path: scheme.manifest_path(base, version),
514 size: Some(meta.size),
515 naming_scheme: scheme,
516 e_tag: meta.e_tag,
517 })
518 .collect();
519
520 if hint_version > since_version + 1 {
525 let gap_locations: Vec<ManifestLocation> =
526 futures::stream::iter((since_version + 1)..hint_version)
527 .map(|version| async move {
528 object_store
529 .inner
530 .head(&scheme.manifest_path(base, version))
531 .await
532 .map(|meta| ManifestLocation {
533 version,
534 path: scheme.manifest_path(base, version),
535 size: Some(meta.size),
536 naming_scheme: scheme,
537 e_tag: meta.e_tag,
538 })
539 })
540 .buffer_unordered(object_store.io_parallelism())
541 .try_collect()
542 .await
543 .ok()?;
544 locations.extend(gap_locations);
545 }
546
547 locations.sort_by_key(|loc| std::cmp::Reverse(loc.version));
548 Some(locations)
549}
550
551async fn resolve_version_from_listing(
553 object_store: &ObjectStore,
554 base: &Path,
555) -> Result<ManifestLocation> {
556 let manifest_files = object_store.list(Some(base.clone().join(VERSIONS_DIR)));
557
558 let mut valid_manifests = manifest_files.try_filter_map(|res| {
559 let filename = res.location.filename().unwrap();
560 if let Some(scheme) = ManifestNamingScheme::detect_scheme(filename) {
561 if scheme.parse_version(filename).is_some() {
563 future::ready(Ok(Some((scheme, res))))
564 } else {
565 future::ready(Ok(None))
566 }
567 } else {
568 future::ready(Ok(None))
569 }
570 });
571
572 let first = valid_manifests.next().await.transpose()?;
573 match (first, object_store.list_is_lexically_ordered) {
574 (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
577 let version = scheme
578 .parse_version(meta.location.filename().unwrap())
579 .unwrap();
580
581 for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
585 if scheme != ManifestNamingScheme::V2 {
586 warn!(
587 "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
588 to migrate the directory."
589 );
590 break;
591 }
592 let next_version = scheme
593 .parse_version(meta.location.filename().unwrap())
594 .unwrap();
595 if next_version >= version {
596 warn!(
597 "List operation was expected to be lexically ordered, but was not. This \
598 could mean a corrupt read. Please make a bug report on the lance-format/lance \
599 GitHub repository."
600 );
601 break;
602 }
603 }
604
605 Ok(ManifestLocation {
606 version,
607 path: meta.location,
608 size: Some(meta.size),
609 naming_scheme: scheme,
610 e_tag: meta.e_tag,
611 })
612 }
613 (Some((first_scheme, meta)), _) => {
616 let mut current_version = first_scheme
617 .parse_version(meta.location.filename().unwrap())
618 .unwrap();
619 let mut current_meta = meta;
620 let scheme = first_scheme;
621
622 while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
623 if entry_scheme != scheme {
624 return Err(Error::internal(format!(
625 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
626 Use `migrate_manifest_paths_v2` to migrate the directory.",
627 scheme, entry_scheme
628 )));
629 }
630 let version = entry_scheme
631 .parse_version(meta.location.filename().unwrap())
632 .unwrap();
633 if version > current_version {
634 current_version = version;
635 current_meta = meta;
636 }
637 }
638 Ok(ManifestLocation {
639 version: current_version,
640 path: current_meta.location,
641 size: Some(current_meta.size),
642 naming_scheme: scheme,
643 e_tag: current_meta.e_tag,
644 })
645 }
646 (None, _) => Err(Error::not_found(
647 base.clone().join(VERSIONS_DIR).to_string(),
648 )),
649 }
650}
651
652fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
656 let path = lance_io::local::to_local_path(&base.clone().join(VERSIONS_DIR));
657 let entries = std::fs::read_dir(path)?;
658
659 let mut latest_entry: Option<(u64, DirEntry)> = None;
660
661 let mut scheme: Option<ManifestNamingScheme> = None;
662
663 for entry in entries {
664 let entry = entry?;
665 let filename_raw = entry.file_name();
666 let filename = filename_raw.to_string_lossy();
667
668 let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
669 continue;
672 };
673
674 if let Some(scheme) = scheme {
675 if scheme != entry_scheme {
676 return Err(io::Error::new(
677 io::ErrorKind::InvalidData,
678 format!(
679 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
680 scheme, entry_scheme
681 ),
682 ));
683 }
684 } else {
685 scheme = Some(entry_scheme);
686 }
687
688 let Some(version) = entry_scheme.parse_version(&filename) else {
689 continue;
690 };
691
692 if let Some((latest_version, _)) = &latest_entry {
693 if version > *latest_version {
694 latest_entry = Some((version, entry));
695 }
696 } else {
697 latest_entry = Some((version, entry));
698 }
699 }
700
701 if let Some((version, entry)) = latest_entry {
702 let path = Path::from_filesystem_path(entry.path())
703 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
704 let metadata = entry.metadata()?;
705 Ok(Some(ManifestLocation {
706 version,
707 path,
708 size: Some(metadata.len()),
709 naming_scheme: scheme.unwrap(),
710 e_tag: Some(get_etag(&metadata)),
711 }))
712 } else {
713 Ok(None)
714 }
715}
716
717fn list_manifests<'a>(
718 base_path: &Path,
719 object_store: &'a dyn OSObjectStore,
720) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
721 object_store
722 .read_dir_all(&base_path.clone().join(VERSIONS_DIR), None)
723 .filter_map(|obj_meta| {
724 futures::future::ready(
725 obj_meta
726 .map(|m| ManifestLocation::try_from(m).ok())
727 .transpose(),
728 )
729 })
730 .boxed()
731}
732
733fn detached_manifest_location_from_meta(
735 meta: object_store::ObjectMeta,
736) -> Option<ManifestLocation> {
737 let filename = meta.location.filename()?;
738 let version = ManifestNamingScheme::parse_detached_version(filename)?;
739 Some(ManifestLocation {
740 version,
741 path: meta.location,
742 size: Some(meta.size),
743 naming_scheme: ManifestNamingScheme::V2,
744 e_tag: meta.e_tag,
745 })
746}
747
748pub fn list_detached_manifests<'a>(
750 base_path: &Path,
751 object_store: &'a dyn OSObjectStore,
752) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
753 object_store
754 .read_dir_all(&base_path.clone().join(VERSIONS_DIR), None)
755 .filter_map(|obj_meta| {
756 futures::future::ready(
757 obj_meta
758 .map(detached_manifest_location_from_meta)
759 .transpose(),
760 )
761 })
762 .boxed()
763}
764
765fn make_staging_manifest_path(base: &Path) -> Result<Path> {
766 let id = uuid::Uuid::new_v4().to_string();
767 Path::parse(format!("{base}-{id}")).map_err(|e| Error::io_source(Box::new(e)))
768}
769
770#[cfg(feature = "dynamodb")]
771const DDB_URL_QUERY_KEY: &str = "ddbTableName";
772
773#[async_trait::async_trait]
782#[allow(clippy::too_many_arguments)]
783pub trait CommitHandler: Debug + Send + Sync {
784 async fn resolve_latest_location(
785 &self,
786 base_path: &Path,
787 object_store: &ObjectStore,
788 ) -> Result<ManifestLocation> {
789 Ok(current_manifest_path(object_store, base_path).await?)
790 }
791
792 async fn resolve_version_location(
793 &self,
794 base_path: &Path,
795 version: u64,
796 object_store: &dyn OSObjectStore,
797 ) -> Result<ManifestLocation> {
798 default_resolve_version(base_path, version, object_store).await
799 }
800
801 async fn version_exists(
807 &self,
808 base_path: &Path,
809 version: u64,
810 object_store: &dyn OSObjectStore,
811 naming_scheme: ManifestNamingScheme,
812 ) -> Result<bool> {
813 let path = naming_scheme.manifest_path(base_path, version);
814 match object_store.head(&path).await {
815 Ok(_) => Ok(true),
816 Err(ObjectStoreError::NotFound { .. }) => Ok(false),
817 Err(e) => Err(e.into()),
818 }
819 }
820
821 fn list_detached_manifest_locations<'a>(
825 &self,
826 base_path: &Path,
827 object_store: &'a ObjectStore,
828 ) -> BoxStream<'a, Result<ManifestLocation>> {
829 list_detached_manifests(base_path, &object_store.inner).boxed()
830 }
831
832 fn list_manifest_locations<'a>(
839 &self,
840 base_path: &Path,
841 object_store: &'a ObjectStore,
842 sorted_descending: bool,
843 ) -> BoxStream<'a, Result<ManifestLocation>> {
844 let underlying_stream = list_manifests(base_path, &object_store.inner);
845
846 if !sorted_descending {
847 return underlying_stream.boxed();
848 }
849
850 async fn sort_stream(
851 input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
852 ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
853 let mut locations = input_stream.try_collect::<Vec<_>>().await?;
854 locations.sort_by_key(|m| std::cmp::Reverse(m.version));
855 Ok(futures::stream::iter(locations.into_iter().map(Ok)))
856 }
857
858 if object_store.list_is_lexically_ordered {
861 let mut peekable = underlying_stream.peekable();
863
864 futures::stream::once(async move {
865 let naming_scheme = match Pin::new(&mut peekable).peek().await {
866 Some(Ok(m)) => m.naming_scheme,
867 Some(Err(_)) => ManifestNamingScheme::V2,
870 None => ManifestNamingScheme::V2,
871 };
872
873 if naming_scheme == ManifestNamingScheme::V2 {
874 Ok(Either::Left(peekable))
876 } else {
877 sort_stream(peekable).await.map(Either::Right)
878 }
879 })
880 .try_flatten()
881 .boxed()
882 } else {
883 futures::stream::once(sort_stream(underlying_stream))
888 .try_flatten()
889 .boxed()
890 }
891 }
892
893 fn list_manifest_locations_since<'a>(
901 &self,
902 base_path: &Path,
903 object_store: &'a ObjectStore,
904 since_version: u64,
905 ) -> BoxStream<'a, Result<ManifestLocation>> {
906 if !uses_version_hint(object_store) {
907 return self
908 .list_manifest_locations(base_path, object_store, true)
909 .try_take_while(move |loc| future::ready(Ok(loc.version > since_version)))
910 .boxed();
911 }
912
913 let base_path = base_path.clone();
914 futures::stream::once(async move {
915 let locations = match list_manifests_since_version_with_hint(
916 object_store,
917 &base_path,
918 since_version,
919 )
920 .await
921 {
922 Some(locations) => locations,
923 None => {
924 let mut locations = list_manifests(&base_path, &object_store.inner)
925 .try_collect::<Vec<_>>()
926 .await?;
927 locations.retain(|loc| loc.version > since_version);
928 locations.sort_by_key(|loc| std::cmp::Reverse(loc.version));
929 locations
930 }
931 };
932 Ok::<_, Error>(futures::stream::iter(locations.into_iter().map(Ok)))
933 })
934 .try_flatten()
935 .boxed()
936 }
937
938 async fn commit(
943 &self,
944 manifest: &mut Manifest,
945 indices: Option<Vec<IndexMetadata>>,
946 base_path: &Path,
947 object_store: &ObjectStore,
948 manifest_writer: ManifestWriter,
949 naming_scheme: ManifestNamingScheme,
950 transaction: Option<Transaction>,
951 ) -> std::result::Result<ManifestLocation, CommitError>;
952
953 async fn delete(&self, _base_path: &Path) -> Result<()> {
955 Ok(())
956 }
957}
958
959async fn default_resolve_version(
960 base_path: &Path,
961 version: u64,
962 object_store: &dyn OSObjectStore,
963) -> Result<ManifestLocation> {
964 if is_detached_version(version) {
965 return Ok(ManifestLocation {
966 version,
967 naming_scheme: ManifestNamingScheme::V2,
970 path: ManifestNamingScheme::V2.manifest_path(base_path, version),
972 size: None,
973 e_tag: None,
974 });
975 }
976
977 let scheme = ManifestNamingScheme::V2;
979 let path = scheme.manifest_path(base_path, version);
980 match object_store.head(&path).await {
981 Ok(meta) => Ok(ManifestLocation {
982 version,
983 path,
984 size: Some(meta.size),
985 naming_scheme: scheme,
986 e_tag: meta.e_tag,
987 }),
988 Err(ObjectStoreError::NotFound { .. }) => {
989 let scheme = ManifestNamingScheme::V1;
991 Ok(ManifestLocation {
992 version,
993 path: scheme.manifest_path(base_path, version),
994 size: None,
995 naming_scheme: scheme,
996 e_tag: None,
997 })
998 }
999 Err(e) => Err(e.into()),
1000 }
1001}
1002#[cfg(feature = "dynamodb")]
1004#[derive(Debug)]
1005struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
1006
1007#[cfg(feature = "dynamodb")]
1008impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
1009 fn provide_credentials<'a>(
1010 &'a self,
1011 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
1012 where
1013 Self: 'a,
1014 {
1015 aws_credential_types::provider::future::ProvideCredentials::new(async {
1016 let creds = self
1017 .0
1018 .get_credential()
1019 .await
1020 .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
1021 Ok(aws_credential_types::Credentials::new(
1022 &creds.key_id,
1023 &creds.secret_key,
1024 creds.token.clone(),
1025 Some(
1026 SystemTime::now()
1027 .checked_add(Duration::from_secs(
1028 60 * 10, ))
1030 .expect("overflow"),
1031 ),
1032 "",
1033 ))
1034 })
1035 }
1036}
1037
1038#[cfg(feature = "dynamodb")]
1039async fn build_dynamodb_external_store(
1040 table_name: &str,
1041 creds: AwsCredentialProvider,
1042 region: &str,
1043 endpoint: Option<String>,
1044 app_name: &str,
1045) -> Result<Arc<dyn ExternalManifestStore>> {
1046 use super::commit::dynamodb::DynamoDBExternalManifestStore;
1047 use aws_sdk_dynamodb::{
1048 Client,
1049 config::{IdentityCache, Region, retry::RetryConfig},
1050 };
1051
1052 let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
1053 .behavior_version_latest()
1054 .region(Some(Region::new(region.to_string())))
1055 .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
1056 .identity_cache(IdentityCache::no_cache())
1058 .retry_config(RetryConfig::standard().with_max_attempts(5));
1061
1062 if let Some(endpoint) = endpoint {
1063 dynamodb_config = dynamodb_config.endpoint_url(endpoint);
1064 }
1065 let client = Client::from_conf(dynamodb_config.build());
1066
1067 DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
1068}
1069
1070pub async fn commit_handler_from_url(
1071 url_or_path: &str,
1072 #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
1074) -> Result<Arc<dyn CommitHandler>> {
1075 let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
1076 Arc::new(RenameCommitHandler)
1077 } else {
1078 Arc::new(ConditionalPutCommitHandler)
1079 };
1080
1081 let url = match Url::parse(url_or_path) {
1082 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
1083 return Ok(local_handler);
1085 }
1086 Ok(url) => url,
1087 Err(_) => {
1088 return Ok(local_handler);
1089 }
1090 };
1091
1092 match url.scheme() {
1093 "file" | "file-object-store" => Ok(local_handler),
1094 "s3" | "gs" | "az" | "abfss" | "memory" | "oss" | "cos" | "shared-memory" => {
1095 Ok(Arc::new(ConditionalPutCommitHandler))
1096 }
1097 #[cfg(not(feature = "dynamodb"))]
1098 "s3+ddb" => Err(Error::invalid_input_source(
1099 "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
1100 )),
1101 #[cfg(feature = "dynamodb")]
1102 "s3+ddb" => {
1103 if url.query_pairs().count() != 1 {
1104 return Err(Error::invalid_input_source(
1105 "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
1106 ));
1107 }
1108 let table_name = match url.query_pairs().next() {
1109 Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
1110 if key == DDB_URL_QUERY_KEY =>
1111 {
1112 if table_name.is_empty() {
1113 return Err(Error::invalid_input_source(
1114 "`s3+ddb://` scheme requires non empty dynamodb table name".into(),
1115 ));
1116 }
1117 table_name
1118 }
1119 _ => {
1120 return Err(Error::invalid_input_source(
1121 "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
1122 ));
1123 }
1124 };
1125 let options = options.clone().unwrap_or_default();
1126 let storage_options_raw =
1127 StorageOptions(options.storage_options().cloned().unwrap_or_default());
1128 let dynamo_endpoint = get_dynamodb_endpoint(&storage_options_raw);
1129 let storage_options = storage_options_raw.as_s3_options();
1130
1131 let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
1132
1133 let accessor = options.get_accessor();
1135
1136 let (aws_creds, region) = build_aws_credential(
1137 options.s3_credentials_refresh_offset,
1138 options.aws_credentials.clone(),
1139 Some(&storage_options),
1140 region,
1141 accessor,
1142 )
1143 .await?;
1144
1145 Ok(Arc::new(ExternalManifestCommitHandler {
1146 external_manifest_store: build_dynamodb_external_store(
1147 table_name,
1148 aws_creds.clone(),
1149 ®ion,
1150 dynamo_endpoint,
1151 "lancedb",
1152 )
1153 .await?,
1154 }))
1155 }
1156 _ => Ok(Arc::new(UnsafeCommitHandler)),
1157 }
1158}
1159
1160#[cfg(feature = "dynamodb")]
1161fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
1162 if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
1163 Some(endpoint.clone())
1164 } else {
1165 std::env::var("DYNAMODB_ENDPOINT").ok()
1166 }
1167}
1168
1169#[derive(Debug)]
1171pub enum CommitError {
1172 CommitConflict,
1174 OtherError(Error),
1176}
1177
1178impl From<Error> for CommitError {
1179 fn from(e: Error) -> Self {
1180 Self::OtherError(e)
1181 }
1182}
1183
1184impl From<CommitError> for Error {
1185 fn from(e: CommitError) -> Self {
1186 match e {
1187 CommitError::CommitConflict => Self::internal("Commit conflict".to_string()),
1188 CommitError::OtherError(e) => e,
1189 }
1190 }
1191}
1192
1193static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
1195
1196pub struct UnsafeCommitHandler;
1200
1201#[async_trait::async_trait]
1202#[allow(clippy::too_many_arguments)]
1203impl CommitHandler for UnsafeCommitHandler {
1204 async fn commit(
1205 &self,
1206 manifest: &mut Manifest,
1207 indices: Option<Vec<IndexMetadata>>,
1208 base_path: &Path,
1209 object_store: &ObjectStore,
1210 manifest_writer: ManifestWriter,
1211 naming_scheme: ManifestNamingScheme,
1212 transaction: Option<Transaction>,
1213 ) -> std::result::Result<ManifestLocation, CommitError> {
1214 if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
1216 WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
1217 log::warn!(
1218 "Using unsafe commit handler. Concurrent writes may result in data loss. \
1219 Consider providing a commit handler that prevents conflicting writes."
1220 );
1221 }
1222
1223 let version_path = naming_scheme.manifest_path(base_path, manifest.version);
1224 let res =
1225 manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
1226
1227 write_version_hint(object_store, base_path, manifest.version).await;
1228
1229 Ok(ManifestLocation {
1230 version: manifest.version,
1231 size: Some(res.size as u64),
1232 naming_scheme,
1233 path: version_path,
1234 e_tag: res.e_tag,
1235 })
1236 }
1237}
1238
1239impl Debug for UnsafeCommitHandler {
1240 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1241 f.debug_struct("UnsafeCommitHandler").finish()
1242 }
1243}
1244
1245#[async_trait::async_trait]
1247pub trait CommitLock: Debug {
1248 type Lease: CommitLease;
1249
1250 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
1263}
1264
1265#[async_trait::async_trait]
1266pub trait CommitLease: Send + Sync {
1267 async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
1273}
1274
1275struct LeaseGuard<L: CommitLease + 'static> {
1284 lease: Option<L>,
1285}
1286
1287impl<L: CommitLease + 'static> LeaseGuard<L> {
1288 fn new(lease: L) -> Self {
1289 Self { lease: Some(lease) }
1290 }
1291
1292 async fn release(mut self, success: bool) -> std::result::Result<(), CommitError> {
1294 let result = {
1299 let lease = self
1300 .lease
1301 .as_ref()
1302 .expect("LeaseGuard released more than once");
1303 lease.release(success).await
1304 };
1305 self.lease = None;
1306 result
1307 }
1308}
1309
1310impl<L: CommitLease + 'static> Drop for LeaseGuard<L> {
1311 fn drop(&mut self) {
1312 if let Some(lease) = self.lease.take() {
1313 if let Ok(handle) = tokio::runtime::Handle::try_current() {
1318 handle.spawn(async move {
1319 let _ = lease.release(false).await;
1320 });
1321 }
1322 }
1323 }
1324}
1325
1326#[async_trait::async_trait]
1327impl<T: CommitLock + Send + Sync> CommitHandler for T
1328where
1329 T::Lease: 'static,
1330{
1331 async fn commit(
1332 &self,
1333 manifest: &mut Manifest,
1334 indices: Option<Vec<IndexMetadata>>,
1335 base_path: &Path,
1336 object_store: &ObjectStore,
1337 manifest_writer: ManifestWriter,
1338 naming_scheme: ManifestNamingScheme,
1339 transaction: Option<Transaction>,
1340 ) -> std::result::Result<ManifestLocation, CommitError> {
1341 let path = naming_scheme.manifest_path(base_path, manifest.version);
1342 let lease = LeaseGuard::new(self.lock(manifest.version).await?);
1347
1348 match object_store.inner.head(&path).await {
1350 Ok(_) => {
1351 lease.release(false).await?;
1354
1355 return Err(CommitError::CommitConflict);
1356 }
1357 Err(ObjectStoreError::NotFound { .. }) => {}
1358 Err(e) => {
1359 lease.release(false).await?;
1362
1363 return Err(CommitError::OtherError(e.into()));
1364 }
1365 }
1366 let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
1367
1368 lease.release(res.is_ok()).await?;
1370
1371 let res = res?;
1372
1373 write_version_hint(object_store, base_path, manifest.version).await;
1374
1375 Ok(ManifestLocation {
1376 version: manifest.version,
1377 size: Some(res.size as u64),
1378 naming_scheme,
1379 path,
1380 e_tag: res.e_tag,
1381 })
1382 }
1383}
1384
1385#[async_trait::async_trait]
1386impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T>
1387where
1388 T::Lease: 'static,
1389{
1390 async fn commit(
1391 &self,
1392 manifest: &mut Manifest,
1393 indices: Option<Vec<IndexMetadata>>,
1394 base_path: &Path,
1395 object_store: &ObjectStore,
1396 manifest_writer: ManifestWriter,
1397 naming_scheme: ManifestNamingScheme,
1398 transaction: Option<Transaction>,
1399 ) -> std::result::Result<ManifestLocation, CommitError> {
1400 self.as_ref()
1401 .commit(
1402 manifest,
1403 indices,
1404 base_path,
1405 object_store,
1406 manifest_writer,
1407 naming_scheme,
1408 transaction,
1409 )
1410 .await
1411 }
1412}
1413
1414pub struct RenameCommitHandler;
1418
1419#[async_trait::async_trait]
1420impl CommitHandler for RenameCommitHandler {
1421 async fn commit(
1422 &self,
1423 manifest: &mut Manifest,
1424 indices: Option<Vec<IndexMetadata>>,
1425 base_path: &Path,
1426 object_store: &ObjectStore,
1427 manifest_writer: ManifestWriter,
1428 naming_scheme: ManifestNamingScheme,
1429 transaction: Option<Transaction>,
1430 ) -> std::result::Result<ManifestLocation, CommitError> {
1431 let path = naming_scheme.manifest_path(base_path, manifest.version);
1435 let tmp_path = make_staging_manifest_path(&path)?;
1436
1437 let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1438
1439 match object_store
1440 .inner
1441 .rename_if_not_exists(&tmp_path, &path)
1442 .await
1443 {
1444 Ok(_) => {
1445 write_version_hint(object_store, base_path, manifest.version).await;
1447 Ok(ManifestLocation {
1448 version: manifest.version,
1449 path,
1450 size: Some(res.size as u64),
1451 naming_scheme,
1452 e_tag: None, })
1454 }
1455 Err(ObjectStoreError::AlreadyExists { .. }) => {
1456 let _ = object_store.delete(&tmp_path).await;
1459
1460 return Err(CommitError::CommitConflict);
1461 }
1462 Err(e) => {
1463 return Err(CommitError::OtherError(e.into()));
1465 }
1466 }
1467 }
1468}
1469
1470impl Debug for RenameCommitHandler {
1471 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1472 f.debug_struct("RenameCommitHandler").finish()
1473 }
1474}
1475
1476pub struct ConditionalPutCommitHandler;
1477
1478#[async_trait::async_trait]
1479impl CommitHandler for ConditionalPutCommitHandler {
1480 async fn commit(
1481 &self,
1482 manifest: &mut Manifest,
1483 indices: Option<Vec<IndexMetadata>>,
1484 base_path: &Path,
1485 object_store: &ObjectStore,
1486 manifest_writer: ManifestWriter,
1487 naming_scheme: ManifestNamingScheme,
1488 transaction: Option<Transaction>,
1489 ) -> std::result::Result<ManifestLocation, CommitError> {
1490 let path = naming_scheme.manifest_path(base_path, manifest.version);
1491
1492 let memory_store = ObjectStore::memory();
1493 let dummy_path = "dummy";
1494 manifest_writer(
1495 &memory_store,
1496 manifest,
1497 indices,
1498 &dummy_path.into(),
1499 transaction,
1500 )
1501 .await?;
1502 let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1503 let size = dummy_data.len() as u64;
1504 let res = object_store
1505 .inner
1506 .put_opts(
1507 &path,
1508 dummy_data.into(),
1509 PutOptions {
1510 mode: object_store::PutMode::Create,
1511 ..Default::default()
1512 },
1513 )
1514 .await
1515 .map_err(|err| match err {
1516 ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1517 CommitError::CommitConflict
1518 }
1519 _ => CommitError::OtherError(err.into()),
1520 })?;
1521
1522 write_version_hint(object_store, base_path, manifest.version).await;
1523
1524 Ok(ManifestLocation {
1525 version: manifest.version,
1526 path,
1527 size: Some(size),
1528 naming_scheme,
1529 e_tag: res.e_tag,
1530 })
1531 }
1532}
1533
1534impl Debug for ConditionalPutCommitHandler {
1535 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1536 f.debug_struct("ConditionalPutCommitHandler").finish()
1537 }
1538}
1539
1540#[derive(Debug, Clone)]
1541pub struct CommitConfig {
1542 pub num_retries: u32,
1543 pub skip_auto_cleanup: bool,
1544 }
1546
1547impl Default for CommitConfig {
1548 fn default() -> Self {
1549 Self {
1550 num_retries: 20,
1551 skip_auto_cleanup: false,
1552 }
1553 }
1554}
1555
1556#[cfg(test)]
1557mod tests {
1558 use std::sync::atomic::AtomicUsize;
1559
1560 use lance_core::utils::tempfile::TempObjDir;
1561
1562 use super::*;
1563
1564 #[test]
1565 fn test_manifest_naming_scheme() {
1566 let v1 = ManifestNamingScheme::V1;
1567 let v2 = ManifestNamingScheme::V2;
1568
1569 assert_eq!(
1570 v1.manifest_path(&Path::from("base"), 0),
1571 Path::from("base/_versions/0.manifest")
1572 );
1573 assert_eq!(
1574 v1.manifest_path(&Path::from("base"), 42),
1575 Path::from("base/_versions/42.manifest")
1576 );
1577
1578 assert_eq!(
1579 v2.manifest_path(&Path::from("base"), 0),
1580 Path::from("base/_versions/18446744073709551615.manifest")
1581 );
1582 assert_eq!(
1583 v2.manifest_path(&Path::from("base"), 42),
1584 Path::from("base/_versions/18446744073709551573.manifest")
1585 );
1586
1587 assert_eq!(v1.parse_version("0.manifest"), Some(0));
1588 assert_eq!(v1.parse_version("42.manifest"), Some(42));
1589 assert_eq!(
1590 v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1591 Some(42)
1592 );
1593
1594 assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1595 assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1596 assert_eq!(
1597 v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1598 Some(42)
1599 );
1600
1601 assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1602 assert_eq!(
1603 ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1604 Some(v2)
1605 );
1606 assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1607 }
1608
1609 #[tokio::test]
1610 async fn test_manifest_naming_migration() {
1611 let object_store = ObjectStore::memory();
1612 let base = Path::from("base");
1613 let versions_dir = base.clone().join(VERSIONS_DIR);
1614
1615 let original_files = vec![
1617 versions_dir.clone().join("irrelevant"),
1618 ManifestNamingScheme::V1.manifest_path(&base, 0),
1619 ManifestNamingScheme::V2.manifest_path(&base, 1),
1620 ];
1621 for path in original_files {
1622 object_store.put(&path, b"".as_slice()).await.unwrap();
1623 }
1624
1625 migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1626
1627 let expected_files = vec![
1628 ManifestNamingScheme::V2.manifest_path(&base, 1),
1629 ManifestNamingScheme::V2.manifest_path(&base, 0),
1630 versions_dir.clone().join("irrelevant"),
1631 ];
1632 let actual_files = object_store
1633 .inner
1634 .list(Some(&versions_dir))
1635 .map_ok(|res| res.location)
1636 .try_collect::<Vec<_>>()
1637 .await
1638 .unwrap();
1639 assert_eq!(actual_files, expected_files);
1640 }
1641
1642 #[tokio::test]
1643 #[rstest::rstest]
1644 async fn test_list_manifests_sorted(
1645 #[values(true, false)] lexical_list_store: bool,
1646 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1647 naming_scheme: ManifestNamingScheme,
1648 ) {
1649 let tempdir;
1650 let (object_store, base) = if lexical_list_store {
1651 (Box::new(ObjectStore::memory()), Path::from("base"))
1652 } else {
1653 tempdir = TempObjDir::default();
1654 let path = tempdir.clone().join("base");
1655 let store = Box::new(ObjectStore::local());
1656 assert!(!store.list_is_lexically_ordered);
1657 (store, path)
1658 };
1659
1660 let mut expected_paths = Vec::new();
1662 for i in (0..12).rev() {
1663 let path = naming_scheme.manifest_path(&base, i);
1664 object_store.put(&path, b"".as_slice()).await.unwrap();
1665 expected_paths.push(path);
1666 }
1667
1668 let actual_versions = ConditionalPutCommitHandler
1669 .list_manifest_locations(&base, &object_store, true)
1670 .map_ok(|location| location.path)
1671 .try_collect::<Vec<_>>()
1672 .await
1673 .unwrap();
1674
1675 assert_eq!(actual_versions, expected_paths);
1676 }
1677
1678 #[tokio::test]
1679 #[rstest::rstest]
1680 async fn test_current_manifest_path(
1681 #[values(true, false)] lexical_list_store: bool,
1682 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1683 naming_scheme: ManifestNamingScheme,
1684 ) {
1685 let mut object_store = ObjectStore::memory();
1688 object_store.list_is_lexically_ordered = lexical_list_store;
1689 let object_store = Box::new(object_store);
1690 let base = Path::from("base");
1691
1692 for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1694 let path = naming_scheme.manifest_path(&base, version);
1695 object_store.put(&path, b"".as_slice()).await.unwrap();
1696 }
1697
1698 let location = current_manifest_path(&object_store, &base).await.unwrap();
1699
1700 assert_eq!(location.version, 11);
1701 assert_eq!(location.naming_scheme, naming_scheme);
1702 assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1703 }
1704
1705 fn non_lexical_memory_store() -> Box<ObjectStore> {
1708 let mut object_store = ObjectStore::memory();
1709 object_store.list_is_lexically_ordered = false;
1710 Box::new(object_store)
1711 }
1712
1713 #[tokio::test]
1714 async fn test_write_version_hint() {
1715 let base = Path::from("base");
1716
1717 let lexical = ObjectStore::memory();
1719 write_version_hint(&lexical, &base, 42).await;
1720 assert_eq!(read_version_from_hint(&lexical, &base).await, None);
1721
1722 let object_store = non_lexical_memory_store();
1723 write_version_hint(&object_store, &base, 42).await;
1724 assert_eq!(read_version_from_hint(&object_store, &base).await, Some(42));
1725
1726 write_version_hint(&object_store, &base, 100).await;
1728 assert_eq!(
1729 read_version_from_hint(&object_store, &base).await,
1730 Some(100)
1731 );
1732
1733 write_version_hint(
1735 &object_store,
1736 &base,
1737 crate::format::DETACHED_VERSION_MASK | 7,
1738 )
1739 .await;
1740 assert_eq!(
1741 read_version_from_hint(&object_store, &base).await,
1742 Some(100)
1743 );
1744
1745 let hint_path = version_hint_path(&base);
1747 object_store
1748 .put(&hint_path, b"not json".as_slice())
1749 .await
1750 .unwrap();
1751 assert_eq!(read_version_from_hint(&object_store, &base).await, None);
1752 }
1753
1754 #[tokio::test]
1755 #[rstest::rstest]
1756 async fn test_read_version_hint_and_probe(
1757 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1758 naming_scheme: ManifestNamingScheme,
1759 ) {
1760 let object_store = non_lexical_memory_store();
1761 let base = Path::from("base");
1762
1763 assert!(
1765 read_version_hint_and_probe(&object_store, &base)
1766 .await
1767 .is_none()
1768 );
1769
1770 for version in 1..=5 {
1771 object_store
1772 .put(&naming_scheme.manifest_path(&base, version), b"".as_slice())
1773 .await
1774 .unwrap();
1775 }
1776
1777 write_version_hint(&object_store, &base, 3).await;
1779 let location = read_version_hint_and_probe(&object_store, &base)
1780 .await
1781 .unwrap();
1782 assert_eq!(location.version, 5);
1783 assert_eq!(location.naming_scheme, naming_scheme);
1784
1785 write_version_hint(&object_store, &base, 5).await;
1787 let location = read_version_hint_and_probe(&object_store, &base)
1788 .await
1789 .unwrap();
1790 assert_eq!(location.version, 5);
1791
1792 write_version_hint(&object_store, &base, 10).await;
1794 assert!(
1795 read_version_hint_and_probe(&object_store, &base)
1796 .await
1797 .is_none()
1798 );
1799 }
1800
1801 #[tokio::test]
1802 async fn test_list_manifests_since_version_with_hint() {
1803 let object_store = non_lexical_memory_store();
1804 let base = Path::from("base");
1805 let scheme = ManifestNamingScheme::V2;
1806
1807 for version in 1..=10 {
1808 object_store
1809 .put(&scheme.manifest_path(&base, version), b"".as_slice())
1810 .await
1811 .unwrap();
1812 }
1813
1814 assert!(
1816 list_manifests_since_version_with_hint(&object_store, &base, 7)
1817 .await
1818 .is_none()
1819 );
1820
1821 write_version_hint(&object_store, &base, 10).await;
1823 assert!(matches!(
1824 list_manifests_since_version_with_hint(&object_store, &base, 10).await,
1825 Some(v) if v.is_empty()
1826 ));
1827
1828 let locations = list_manifests_since_version_with_hint(&object_store, &base, 7)
1831 .await
1832 .unwrap();
1833 assert_eq!(
1834 locations.iter().map(|l| l.version).collect::<Vec<_>>(),
1835 vec![10, 9, 8]
1836 );
1837
1838 write_version_hint(&object_store, &base, 8).await;
1840 let locations = list_manifests_since_version_with_hint(&object_store, &base, 7)
1841 .await
1842 .unwrap();
1843 assert_eq!(
1844 locations.iter().map(|l| l.version).collect::<Vec<_>>(),
1845 vec![10, 9, 8]
1846 );
1847
1848 write_version_hint(&object_store, &base, 20).await;
1850 assert!(
1851 list_manifests_since_version_with_hint(&object_store, &base, 7)
1852 .await
1853 .is_none()
1854 );
1855 }
1856
1857 #[tokio::test]
1858 async fn test_current_manifest_path_with_hint_non_lexical() {
1859 let object_store = non_lexical_memory_store();
1861 let base = Path::from("base");
1862 let naming_scheme = ManifestNamingScheme::V2;
1863
1864 for version in 1..=100 {
1865 object_store
1866 .put(&naming_scheme.manifest_path(&base, version), b"".as_slice())
1867 .await
1868 .unwrap();
1869 }
1870
1871 write_version_hint(&object_store, &base, 98).await;
1873 let location = current_manifest_path(&object_store, &base).await.unwrap();
1874 assert_eq!(location.version, 100);
1875 }
1876
1877 #[tokio::test]
1878 async fn test_current_manifest_path_with_stale_hint_falls_back_to_listing() {
1879 let object_store = non_lexical_memory_store();
1880 let base = Path::from("base");
1881 let naming_scheme = ManifestNamingScheme::V2;
1882
1883 object_store
1885 .put(&naming_scheme.manifest_path(&base, 5), b"".as_slice())
1886 .await
1887 .unwrap();
1888 write_version_hint(&object_store, &base, 10).await;
1889
1890 let location = current_manifest_path(&object_store, &base).await.unwrap();
1892 assert_eq!(location.version, 5);
1893 }
1894
1895 #[test]
1896 fn test_parse_detached_version() {
1897 assert_eq!(
1899 ManifestNamingScheme::parse_detached_version("d12345.manifest"),
1900 Some(12345)
1901 );
1902 assert_eq!(
1903 ManifestNamingScheme::parse_detached_version("d9223372036854775808.manifest"),
1904 Some(9223372036854775808)
1905 );
1906
1907 assert_eq!(
1909 ManifestNamingScheme::parse_detached_version("12345.manifest"),
1910 None
1911 );
1912
1913 assert_eq!(
1915 ManifestNamingScheme::parse_detached_version("18446744073709551615.manifest"),
1916 None
1917 );
1918
1919 assert_eq!(ManifestNamingScheme::parse_detached_version("d12345"), None);
1921 }
1922
1923 #[tokio::test]
1924 async fn test_list_detached_manifests() {
1925 use crate::format::DETACHED_VERSION_MASK;
1926 use futures::TryStreamExt;
1927
1928 let object_store = ObjectStore::memory();
1929 let base = Path::from("base");
1930 let versions_dir = base.clone().join(VERSIONS_DIR);
1931
1932 for version in [1, 2, 3] {
1934 let path = ManifestNamingScheme::V2.manifest_path(&base, version);
1935 object_store.put(&path, b"".as_slice()).await.unwrap();
1936 }
1937
1938 let detached_versions: Vec<u64> = vec![
1940 100 | DETACHED_VERSION_MASK,
1941 200 | DETACHED_VERSION_MASK,
1942 300 | DETACHED_VERSION_MASK,
1943 ];
1944 for version in &detached_versions {
1945 let path = versions_dir.clone().join(format!("d{}.manifest", version));
1946 object_store.put(&path, b"".as_slice()).await.unwrap();
1947 }
1948
1949 let detached_locations: Vec<ManifestLocation> =
1951 list_detached_manifests(&base, &object_store.inner)
1952 .try_collect()
1953 .await
1954 .unwrap();
1955
1956 assert_eq!(detached_locations.len(), 3);
1957 for loc in &detached_locations {
1958 assert_eq!(loc.naming_scheme, ManifestNamingScheme::V2);
1959 }
1960
1961 let mut found_versions: Vec<u64> = detached_locations.iter().map(|l| l.version).collect();
1962 found_versions.sort();
1963 let mut expected_versions = detached_versions.clone();
1964 expected_versions.sort();
1965 assert_eq!(found_versions, expected_versions);
1966 }
1967
1968 #[tokio::test]
1969 async fn test_commit_handler_from_url_memory_schemes() {
1970 for url in ["memory://bucket-a/ds", "shared-memory://bucket-a/ds"] {
1975 let handler = commit_handler_from_url(url, &None).await.unwrap();
1976 assert_eq!(
1977 format!("{:?}", handler),
1978 "ConditionalPutCommitHandler",
1979 "{url} should route to ConditionalPutCommitHandler",
1980 );
1981 }
1982 }
1983
1984 #[derive(Debug)]
1987 struct TrackingLock {
1988 released: Arc<AtomicBool>,
1989 }
1990
1991 struct TrackingLease {
1992 released: Arc<AtomicBool>,
1993 }
1994
1995 #[async_trait::async_trait]
1996 impl CommitLock for TrackingLock {
1997 type Lease = TrackingLease;
1998 async fn lock(&self, _version: u64) -> std::result::Result<Self::Lease, CommitError> {
1999 Ok(TrackingLease {
2000 released: self.released.clone(),
2001 })
2002 }
2003 }
2004
2005 #[async_trait::async_trait]
2006 impl CommitLease for TrackingLease {
2007 async fn release(&self, _success: bool) -> std::result::Result<(), CommitError> {
2008 self.released
2009 .store(true, std::sync::atomic::Ordering::SeqCst);
2010 Ok(())
2011 }
2012 }
2013
2014 #[derive(Debug)]
2018 struct HangingReleaseLock {
2019 release_calls: Arc<AtomicUsize>,
2020 released: Arc<AtomicBool>,
2021 }
2022
2023 struct HangingReleaseLease {
2024 release_calls: Arc<AtomicUsize>,
2025 released: Arc<AtomicBool>,
2026 }
2027
2028 #[async_trait::async_trait]
2029 impl CommitLock for HangingReleaseLock {
2030 type Lease = HangingReleaseLease;
2031 async fn lock(&self, _version: u64) -> std::result::Result<Self::Lease, CommitError> {
2032 Ok(HangingReleaseLease {
2033 release_calls: self.release_calls.clone(),
2034 released: self.released.clone(),
2035 })
2036 }
2037 }
2038
2039 #[async_trait::async_trait]
2040 impl CommitLease for HangingReleaseLease {
2041 async fn release(&self, _success: bool) -> std::result::Result<(), CommitError> {
2042 if self
2047 .release_calls
2048 .fetch_add(1, std::sync::atomic::Ordering::SeqCst)
2049 == 0
2050 {
2051 future::pending::<()>().await;
2052 unreachable!()
2053 }
2054 self.released
2055 .store(true, std::sync::atomic::Ordering::SeqCst);
2056 Ok(())
2057 }
2058 }
2059
2060 fn succeeding_manifest_writer<'a>(
2063 _object_store: &'a ObjectStore,
2064 _manifest: &'a mut Manifest,
2065 _indices: Option<Vec<IndexMetadata>>,
2066 _path: &'a Path,
2067 _transaction: Option<Transaction>,
2068 ) -> BoxFuture<'a, Result<WriteResult>> {
2069 Box::pin(async move { Ok(WriteResult::default()) })
2070 }
2071
2072 fn hanging_manifest_writer<'a>(
2074 _object_store: &'a ObjectStore,
2075 _manifest: &'a mut Manifest,
2076 _indices: Option<Vec<IndexMetadata>>,
2077 _path: &'a Path,
2078 _transaction: Option<Transaction>,
2079 ) -> BoxFuture<'a, Result<WriteResult>> {
2080 Box::pin(async move {
2081 future::pending::<()>().await;
2082 unreachable!()
2083 })
2084 }
2085
2086 #[tokio::test]
2089 async fn test_commit_lock_released_on_cancellation() {
2090 use std::collections::HashMap;
2091 use std::sync::atomic::Ordering;
2092 use std::time::Duration;
2093
2094 use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
2095 use lance_core::datatypes::Schema;
2096 use lance_file::version::LanceFileVersion;
2097
2098 use crate::format::DataStorageFormat;
2099
2100 let released = Arc::new(AtomicBool::new(false));
2101 let lock = TrackingLock {
2102 released: released.clone(),
2103 };
2104
2105 let object_store = ObjectStore::memory();
2106 let base_path = Path::from("test");
2107 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, false)]);
2108 let mut manifest = Manifest::new(
2109 Schema::try_from(&arrow_schema).unwrap(),
2110 Arc::new(vec![]),
2111 DataStorageFormat::new(LanceFileVersion::Stable),
2112 HashMap::new(),
2113 );
2114
2115 let commit_fut = lock.commit(
2118 &mut manifest,
2119 None,
2120 &base_path,
2121 &object_store,
2122 hanging_manifest_writer,
2123 ManifestNamingScheme::V2,
2124 None,
2125 );
2126 let timed_out = tokio::time::timeout(Duration::from_millis(50), commit_fut).await;
2127 assert!(timed_out.is_err(), "commit should not have completed");
2128
2129 for _ in 0..100 {
2131 if released.load(Ordering::SeqCst) {
2132 break;
2133 }
2134 tokio::time::sleep(Duration::from_millis(10)).await;
2135 }
2136 assert!(
2137 released.load(Ordering::SeqCst),
2138 "lock must be released after the commit future is cancelled"
2139 );
2140 }
2141
2142 #[tokio::test]
2146 async fn test_commit_lock_released_on_cancellation_during_release() {
2147 use std::collections::HashMap;
2148 use std::sync::atomic::Ordering;
2149 use std::time::Duration;
2150
2151 use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
2152 use lance_core::datatypes::Schema;
2153 use lance_file::version::LanceFileVersion;
2154
2155 use crate::format::DataStorageFormat;
2156
2157 let release_calls = Arc::new(AtomicUsize::new(0));
2158 let released = Arc::new(AtomicBool::new(false));
2159 let lock = HangingReleaseLock {
2160 release_calls: release_calls.clone(),
2161 released: released.clone(),
2162 };
2163
2164 let object_store = ObjectStore::memory();
2165 let base_path = Path::from("test");
2166 let arrow_schema = ArrowSchema::new(vec![ArrowField::new("i", DataType::Int32, false)]);
2167 let mut manifest = Manifest::new(
2168 Schema::try_from(&arrow_schema).unwrap(),
2169 Arc::new(vec![]),
2170 DataStorageFormat::new(LanceFileVersion::Stable),
2171 HashMap::new(),
2172 );
2173
2174 let commit_fut = lock.commit(
2177 &mut manifest,
2178 None,
2179 &base_path,
2180 &object_store,
2181 succeeding_manifest_writer,
2182 ManifestNamingScheme::V2,
2183 None,
2184 );
2185 let timed_out = tokio::time::timeout(Duration::from_millis(50), commit_fut).await;
2186 assert!(timed_out.is_err(), "commit should not have completed");
2187
2188 for _ in 0..100 {
2191 if released.load(Ordering::SeqCst) {
2192 break;
2193 }
2194 tokio::time::sleep(Duration::from_millis(10)).await;
2195 }
2196 assert!(
2197 released.load(Ordering::SeqCst),
2198 "lock must be released even when cancelled during the explicit release"
2199 );
2200 assert_eq!(
2201 release_calls.load(Ordering::SeqCst),
2202 2,
2203 "expected the hung explicit release plus one best-effort drop release"
2204 );
2205 }
2206}