1use std::io;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::sync::atomic::AtomicBool;
29use std::{fmt::Debug, fs::DirEntry};
30
31use super::manifest::write_manifest;
32use futures::Stream;
33use futures::future::Either;
34use futures::{
35 StreamExt, TryStreamExt,
36 future::{self, BoxFuture},
37 stream::BoxStream,
38};
39use lance_file::format::{MAGIC, MAJOR_VERSION, MINOR_VERSION};
40use lance_io::object_writer::{ObjectWriter, WriteResult, get_etag};
41use log::warn;
42use object_store::ObjectStoreExt as OSObjectStoreExt;
43use object_store::PutOptions;
44use object_store::{Error as ObjectStoreError, ObjectStore as OSObjectStore, path::Path};
45use tracing::info;
46use url::Url;
47
48#[cfg(feature = "dynamodb")]
49pub mod dynamodb;
50pub mod external_manifest;
51
52use lance_core::{Error, Result};
53use lance_io::object_store::{ObjectStore, ObjectStoreExt, ObjectStoreParams};
54use lance_io::traits::{WriteExt, Writer};
55
56use crate::format::{IndexMetadata, Manifest, Transaction, is_detached_version};
57use lance_core::utils::tracing::{AUDIT_MODE_CREATE, AUDIT_TYPE_MANIFEST, TRACE_FILE_AUDIT};
58#[cfg(feature = "dynamodb")]
59use {
60 self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
61 aws_credential_types::provider::ProvideCredentials,
62 aws_credential_types::provider::error::CredentialsError,
63 lance_io::object_store::{StorageOptions, providers::aws::build_aws_credential},
64 object_store::aws::AmazonS3ConfigKey,
65 object_store::aws::AwsCredentialProvider,
66 std::borrow::Cow,
67 std::time::{Duration, SystemTime},
68};
69
70pub const VERSIONS_DIR: &str = "_versions";
71const MANIFEST_EXTENSION: &str = "manifest";
72const DETACHED_VERSION_PREFIX: &str = "d";
73const VERSION_HINT_FILE: &str = "latest_version_hint.json";
80
81#[derive(Clone, Copy, Debug, PartialEq, Eq)]
83pub enum ManifestNamingScheme {
84 V1,
86 V2,
90}
91
92impl ManifestNamingScheme {
93 pub fn manifest_path(&self, base: &Path, version: u64) -> Path {
94 if is_detached_version(version) {
95 base.clone().join(VERSIONS_DIR).join(format!(
100 "{DETACHED_VERSION_PREFIX}{version}.{MANIFEST_EXTENSION}"
101 ))
102 } else {
103 let directory = base.clone().join(VERSIONS_DIR);
104 match self {
105 Self::V1 => directory.join(format!("{version}.{MANIFEST_EXTENSION}")),
106 Self::V2 => {
107 let inverted_version = u64::MAX - version;
108 directory.join(format!("{inverted_version:020}.{MANIFEST_EXTENSION}"))
109 }
110 }
111 }
112 }
113
114 pub fn parse_version(&self, filename: &str) -> Option<u64> {
115 let file_number = filename
116 .split_once('.')
117 .and_then(|(version_str, _)| version_str.parse::<u64>().ok());
119 match self {
120 Self::V1 => file_number,
121 Self::V2 => file_number.map(|v| u64::MAX - v),
122 }
123 }
124
125 pub fn parse_detached_version(filename: &str) -> Option<u64> {
129 if !filename.starts_with(DETACHED_VERSION_PREFIX) {
130 return None;
131 }
132 let without_prefix = &filename[DETACHED_VERSION_PREFIX.len()..];
133 without_prefix
134 .split_once('.')
135 .and_then(|(version_str, _)| version_str.parse::<u64>().ok())
136 }
137
138 pub fn detect_scheme(filename: &str) -> Option<Self> {
139 if filename.starts_with(DETACHED_VERSION_PREFIX) {
140 return Some(Self::V2);
142 }
143 if filename.ends_with(MANIFEST_EXTENSION) {
144 const V2_LEN: usize = 20 + 1 + MANIFEST_EXTENSION.len();
145 if filename.len() == V2_LEN {
146 Some(Self::V2)
147 } else {
148 Some(Self::V1)
149 }
150 } else {
151 None
152 }
153 }
154
155 pub fn detect_scheme_staging(filename: &str) -> Self {
156 if filename.chars().nth(20) == Some('.') {
159 Self::V2
160 } else {
161 Self::V1
162 }
163 }
164}
165
166pub async fn migrate_scheme_to_v2(object_store: &ObjectStore, dataset_base: &Path) -> Result<()> {
176 object_store
177 .inner
178 .list(Some(&dataset_base.clone().join(VERSIONS_DIR)))
179 .try_filter(|res| {
180 let res = if let Some(filename) = res.location.filename() {
181 ManifestNamingScheme::detect_scheme(filename) == Some(ManifestNamingScheme::V1)
182 } else {
183 false
184 };
185 future::ready(res)
186 })
187 .try_for_each_concurrent(object_store.io_parallelism(), |meta| async move {
188 let filename = meta.location.filename().unwrap();
189 let version = ManifestNamingScheme::V1.parse_version(filename).unwrap();
190 let path = ManifestNamingScheme::V2.manifest_path(dataset_base, version);
191 object_store.inner.rename(&meta.location, &path).await?;
192 Ok(())
193 })
194 .await?;
195
196 Ok(())
197}
198
199pub type ManifestWriter = for<'a> fn(
203 object_store: &'a ObjectStore,
204 manifest: &'a mut Manifest,
205 indices: Option<Vec<IndexMetadata>>,
206 path: &'a Path,
207 transaction: Option<Transaction>,
208) -> BoxFuture<'a, Result<WriteResult>>;
209
210pub fn write_manifest_file_to_path<'a>(
214 object_store: &'a ObjectStore,
215 manifest: &'a mut Manifest,
216 indices: Option<Vec<IndexMetadata>>,
217 path: &'a Path,
218 transaction: Option<Transaction>,
219) -> BoxFuture<'a, Result<WriteResult>> {
220 Box::pin(async move {
221 let mut object_writer = ObjectWriter::new(object_store, path).await?;
222 let pos = write_manifest(&mut object_writer, manifest, indices, transaction).await?;
223 object_writer
224 .write_magics(pos, MAJOR_VERSION, MINOR_VERSION, MAGIC)
225 .await?;
226 let res = Writer::shutdown(&mut object_writer).await?;
227 info!(target: TRACE_FILE_AUDIT, mode=AUDIT_MODE_CREATE, r#type=AUDIT_TYPE_MANIFEST, path = path.to_string());
228 Ok(res)
229 })
230}
231
232#[derive(Debug, Clone)]
233pub struct ManifestLocation {
234 pub version: u64,
236 pub path: Path,
238 pub size: Option<u64>,
240 pub naming_scheme: ManifestNamingScheme,
242 pub e_tag: Option<String>,
246}
247
248impl TryFrom<object_store::ObjectMeta> for ManifestLocation {
249 type Error = Error;
250
251 fn try_from(meta: object_store::ObjectMeta) -> Result<Self> {
252 let filename = meta.location.filename().ok_or_else(|| {
253 Error::internal("ObjectMeta location does not have a filename".to_string())
254 })?;
255 let scheme = ManifestNamingScheme::detect_scheme(filename)
256 .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
257 let version = scheme
258 .parse_version(filename)
259 .ok_or_else(|| Error::internal(format!("Invalid manifest filename: '{}'", filename)))?;
260 Ok(Self {
261 version,
262 path: meta.location,
263 size: Some(meta.size),
264 naming_scheme: scheme,
265 e_tag: meta.e_tag,
266 })
267 }
268}
269
270async fn current_manifest_path(
280 object_store: &ObjectStore,
281 base: &Path,
282) -> Result<ManifestLocation> {
283 if object_store.is_local() {
284 if let Ok(Some(location)) = current_manifest_local(base) {
285 return Ok(location);
286 }
287 } else if uses_version_hint(object_store)
288 && let Some(location) = read_version_hint_and_probe(object_store, base).await
289 {
290 return Ok(location);
291 }
292
293 resolve_version_from_listing(object_store, base).await
294}
295
296#[derive(serde::Serialize, serde::Deserialize)]
298struct VersionHint {
299 version: u64,
300}
301
302const VERSION_HINT_ENV: &str = "LANCE_USE_VERSION_HINT";
307
308fn version_hint_globally_enabled() -> bool {
309 static ENABLED: std::sync::OnceLock<bool> = std::sync::OnceLock::new();
310 *ENABLED.get_or_init(|| match std::env::var(VERSION_HINT_ENV) {
311 Ok(v) => !matches!(
312 v.trim().to_ascii_lowercase().as_str(),
313 "0" | "false" | "off"
314 ),
315 Err(_) => true,
316 })
317}
318
319pub fn uses_version_hint(object_store: &ObjectStore) -> bool {
328 version_hint_globally_enabled() && !object_store.list_is_lexically_ordered
329}
330
331fn version_hint_path(base: &Path) -> Path {
333 base.clone().join(VERSIONS_DIR).join(VERSION_HINT_FILE)
334}
335
336pub async fn write_version_hint(object_store: &ObjectStore, base: &Path, version: u64) {
344 if is_detached_version(version) || !uses_version_hint(object_store) {
345 return;
346 }
347 let hint_path = version_hint_path(base);
348 let content = serde_json::to_vec(&VersionHint { version }).expect("serialize version hint");
349 if let Err(e) = object_store.put(&hint_path, content.as_slice()).await {
350 warn!("Failed to write version hint file for version {version}: {e}");
351 }
352}
353
354async fn read_version_from_hint(object_store: &ObjectStore, base: &Path) -> Option<u64> {
357 let bytes = object_store
358 .inner
359 .get(&version_hint_path(base))
360 .await
361 .ok()?
362 .bytes()
363 .await
364 .ok()?;
365 Some(serde_json::from_slice::<VersionHint>(&bytes).ok()?.version)
366}
367
368async fn read_version_hint_and_probe(
373 object_store: &ObjectStore,
374 base: &Path,
375) -> Option<ManifestLocation> {
376 let hint_version = read_version_from_hint(object_store, base).await?;
377 let (version, scheme, mut probed) = probe_versions_upward(object_store, base, hint_version)
378 .await
379 .ok()
380 .flatten()?;
381 let (_, meta) = probed.pop()?;
383 Some(ManifestLocation {
384 version,
385 path: scheme.manifest_path(base, version),
386 size: Some(meta.size),
387 naming_scheme: scheme,
388 e_tag: meta.e_tag,
389 })
390}
391
392const MAX_HINT_PROBE_GAP: u64 = 1000;
396
397async fn probe_versions_upward(
414 object_store: &ObjectStore,
415 base: &Path,
416 from_version: u64,
417) -> Result<
418 Option<(
419 u64,
420 ManifestNamingScheme,
421 Vec<(u64, object_store::ObjectMeta)>,
422 )>,
423> {
424 let mut scheme = ManifestNamingScheme::V2;
426 let meta = match object_store
427 .inner
428 .head(&scheme.manifest_path(base, from_version))
429 .await
430 {
431 Ok(meta) => meta,
432 Err(ObjectStoreError::NotFound { .. }) => {
433 scheme = ManifestNamingScheme::V1;
434 match object_store
435 .inner
436 .head(&scheme.manifest_path(base, from_version))
437 .await
438 {
439 Ok(meta) => meta,
440 Err(ObjectStoreError::NotFound { .. }) => return Ok(None),
441 Err(e) => return Err(e.into()),
442 }
443 }
444 Err(e) => return Err(e.into()),
445 };
446
447 let mut probed = vec![(from_version, meta)];
448 let mut version = from_version;
449 loop {
450 let next = version + 1;
451 match object_store
452 .inner
453 .head(&scheme.manifest_path(base, next))
454 .await
455 {
456 Ok(meta) => {
457 probed.push((next, meta));
458 version = next;
459 }
460 Err(ObjectStoreError::NotFound { .. }) => break,
462 Err(e) => return Err(e.into()),
465 }
466 }
467 Ok(Some((version, scheme, probed)))
468}
469
470async fn list_manifests_since_version_with_hint(
477 object_store: &ObjectStore,
478 base: &Path,
479 since_version: u64,
480) -> Option<Vec<ManifestLocation>> {
481 let hint_version = read_version_from_hint(object_store, base).await?;
482
483 if hint_version.saturating_sub(since_version) > MAX_HINT_PROBE_GAP {
486 return None;
487 }
488
489 let probe_from = if hint_version > since_version {
492 hint_version
493 } else {
494 since_version + 1
495 };
496
497 let (scheme, probed) = match probe_versions_upward(object_store, base, probe_from).await {
498 Ok(Some((_true_latest, scheme, probed))) => (scheme, probed),
499 Ok(None) if hint_version > since_version => return None,
503 Ok(None) => return Some(Vec::new()),
504 Err(_) => return None,
506 };
507
508 let mut locations: Vec<ManifestLocation> = probed
509 .into_iter()
510 .filter(|(v, _)| *v > since_version)
511 .map(|(version, meta)| ManifestLocation {
512 version,
513 path: scheme.manifest_path(base, version),
514 size: Some(meta.size),
515 naming_scheme: scheme,
516 e_tag: meta.e_tag,
517 })
518 .collect();
519
520 if hint_version > since_version + 1 {
525 let gap_locations: Vec<ManifestLocation> =
526 futures::stream::iter((since_version + 1)..hint_version)
527 .map(|version| async move {
528 object_store
529 .inner
530 .head(&scheme.manifest_path(base, version))
531 .await
532 .map(|meta| ManifestLocation {
533 version,
534 path: scheme.manifest_path(base, version),
535 size: Some(meta.size),
536 naming_scheme: scheme,
537 e_tag: meta.e_tag,
538 })
539 })
540 .buffer_unordered(object_store.io_parallelism())
541 .try_collect()
542 .await
543 .ok()?;
544 locations.extend(gap_locations);
545 }
546
547 locations.sort_by_key(|loc| std::cmp::Reverse(loc.version));
548 Some(locations)
549}
550
551async fn resolve_version_from_listing(
553 object_store: &ObjectStore,
554 base: &Path,
555) -> Result<ManifestLocation> {
556 let manifest_files = object_store.list(Some(base.clone().join(VERSIONS_DIR)));
557
558 let mut valid_manifests = manifest_files.try_filter_map(|res| {
559 let filename = res.location.filename().unwrap();
560 if let Some(scheme) = ManifestNamingScheme::detect_scheme(filename) {
561 if scheme.parse_version(filename).is_some() {
563 future::ready(Ok(Some((scheme, res))))
564 } else {
565 future::ready(Ok(None))
566 }
567 } else {
568 future::ready(Ok(None))
569 }
570 });
571
572 let first = valid_manifests.next().await.transpose()?;
573 match (first, object_store.list_is_lexically_ordered) {
574 (Some((scheme @ ManifestNamingScheme::V2, meta)), true) => {
577 let version = scheme
578 .parse_version(meta.location.filename().unwrap())
579 .unwrap();
580
581 for (scheme, meta) in valid_manifests.take(999).try_collect::<Vec<_>>().await? {
585 if scheme != ManifestNamingScheme::V2 {
586 warn!(
587 "Found V1 Manifest in a V2 directory. Use `migrate_manifest_paths_v2` \
588 to migrate the directory."
589 );
590 break;
591 }
592 let next_version = scheme
593 .parse_version(meta.location.filename().unwrap())
594 .unwrap();
595 if next_version >= version {
596 warn!(
597 "List operation was expected to be lexically ordered, but was not. This \
598 could mean a corrupt read. Please make a bug report on the lance-format/lance \
599 GitHub repository."
600 );
601 break;
602 }
603 }
604
605 Ok(ManifestLocation {
606 version,
607 path: meta.location,
608 size: Some(meta.size),
609 naming_scheme: scheme,
610 e_tag: meta.e_tag,
611 })
612 }
613 (Some((first_scheme, meta)), _) => {
616 let mut current_version = first_scheme
617 .parse_version(meta.location.filename().unwrap())
618 .unwrap();
619 let mut current_meta = meta;
620 let scheme = first_scheme;
621
622 while let Some((entry_scheme, meta)) = valid_manifests.next().await.transpose()? {
623 if entry_scheme != scheme {
624 return Err(Error::internal(format!(
625 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}. \
626 Use `migrate_manifest_paths_v2` to migrate the directory.",
627 scheme, entry_scheme
628 )));
629 }
630 let version = entry_scheme
631 .parse_version(meta.location.filename().unwrap())
632 .unwrap();
633 if version > current_version {
634 current_version = version;
635 current_meta = meta;
636 }
637 }
638 Ok(ManifestLocation {
639 version: current_version,
640 path: current_meta.location,
641 size: Some(current_meta.size),
642 naming_scheme: scheme,
643 e_tag: current_meta.e_tag,
644 })
645 }
646 (None, _) => Err(Error::not_found(
647 base.clone().join(VERSIONS_DIR).to_string(),
648 )),
649 }
650}
651
652fn current_manifest_local(base: &Path) -> std::io::Result<Option<ManifestLocation>> {
656 let path = lance_io::local::to_local_path(&base.clone().join(VERSIONS_DIR));
657 let entries = std::fs::read_dir(path)?;
658
659 let mut latest_entry: Option<(u64, DirEntry)> = None;
660
661 let mut scheme: Option<ManifestNamingScheme> = None;
662
663 for entry in entries {
664 let entry = entry?;
665 let filename_raw = entry.file_name();
666 let filename = filename_raw.to_string_lossy();
667
668 let Some(entry_scheme) = ManifestNamingScheme::detect_scheme(&filename) else {
669 continue;
672 };
673
674 if let Some(scheme) = scheme {
675 if scheme != entry_scheme {
676 return Err(io::Error::new(
677 io::ErrorKind::InvalidData,
678 format!(
679 "Found multiple manifest naming schemes in the same directory: {:?} and {:?}",
680 scheme, entry_scheme
681 ),
682 ));
683 }
684 } else {
685 scheme = Some(entry_scheme);
686 }
687
688 let Some(version) = entry_scheme.parse_version(&filename) else {
689 continue;
690 };
691
692 if let Some((latest_version, _)) = &latest_entry {
693 if version > *latest_version {
694 latest_entry = Some((version, entry));
695 }
696 } else {
697 latest_entry = Some((version, entry));
698 }
699 }
700
701 if let Some((version, entry)) = latest_entry {
702 let path = Path::from_filesystem_path(entry.path())
703 .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
704 let metadata = entry.metadata()?;
705 Ok(Some(ManifestLocation {
706 version,
707 path,
708 size: Some(metadata.len()),
709 naming_scheme: scheme.unwrap(),
710 e_tag: Some(get_etag(&metadata)),
711 }))
712 } else {
713 Ok(None)
714 }
715}
716
717fn list_manifests<'a>(
718 base_path: &Path,
719 object_store: &'a dyn OSObjectStore,
720) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
721 object_store
722 .read_dir_all(&base_path.clone().join(VERSIONS_DIR), None)
723 .filter_map(|obj_meta| {
724 futures::future::ready(
725 obj_meta
726 .map(|m| ManifestLocation::try_from(m).ok())
727 .transpose(),
728 )
729 })
730 .boxed()
731}
732
733fn detached_manifest_location_from_meta(
735 meta: object_store::ObjectMeta,
736) -> Option<ManifestLocation> {
737 let filename = meta.location.filename()?;
738 let version = ManifestNamingScheme::parse_detached_version(filename)?;
739 Some(ManifestLocation {
740 version,
741 path: meta.location,
742 size: Some(meta.size),
743 naming_scheme: ManifestNamingScheme::V2,
744 e_tag: meta.e_tag,
745 })
746}
747
748pub fn list_detached_manifests<'a>(
750 base_path: &Path,
751 object_store: &'a dyn OSObjectStore,
752) -> impl Stream<Item = Result<ManifestLocation>> + 'a {
753 object_store
754 .read_dir_all(&base_path.clone().join(VERSIONS_DIR), None)
755 .filter_map(|obj_meta| {
756 futures::future::ready(
757 obj_meta
758 .map(detached_manifest_location_from_meta)
759 .transpose(),
760 )
761 })
762 .boxed()
763}
764
765fn make_staging_manifest_path(base: &Path) -> Result<Path> {
766 let id = uuid::Uuid::new_v4().to_string();
767 Path::parse(format!("{base}-{id}")).map_err(|e| Error::io_source(Box::new(e)))
768}
769
770#[cfg(feature = "dynamodb")]
771const DDB_URL_QUERY_KEY: &str = "ddbTableName";
772
773#[async_trait::async_trait]
782#[allow(clippy::too_many_arguments)]
783pub trait CommitHandler: Debug + Send + Sync {
784 async fn resolve_latest_location(
785 &self,
786 base_path: &Path,
787 object_store: &ObjectStore,
788 ) -> Result<ManifestLocation> {
789 Ok(current_manifest_path(object_store, base_path).await?)
790 }
791
792 async fn resolve_version_location(
793 &self,
794 base_path: &Path,
795 version: u64,
796 object_store: &dyn OSObjectStore,
797 ) -> Result<ManifestLocation> {
798 default_resolve_version(base_path, version, object_store).await
799 }
800
801 fn list_detached_manifest_locations<'a>(
805 &self,
806 base_path: &Path,
807 object_store: &'a ObjectStore,
808 ) -> BoxStream<'a, Result<ManifestLocation>> {
809 list_detached_manifests(base_path, &object_store.inner).boxed()
810 }
811
812 fn list_manifest_locations<'a>(
819 &self,
820 base_path: &Path,
821 object_store: &'a ObjectStore,
822 sorted_descending: bool,
823 ) -> BoxStream<'a, Result<ManifestLocation>> {
824 let underlying_stream = list_manifests(base_path, &object_store.inner);
825
826 if !sorted_descending {
827 return underlying_stream.boxed();
828 }
829
830 async fn sort_stream(
831 input_stream: impl futures::Stream<Item = Result<ManifestLocation>> + Unpin,
832 ) -> Result<impl Stream<Item = Result<ManifestLocation>> + Unpin> {
833 let mut locations = input_stream.try_collect::<Vec<_>>().await?;
834 locations.sort_by_key(|m| std::cmp::Reverse(m.version));
835 Ok(futures::stream::iter(locations.into_iter().map(Ok)))
836 }
837
838 if object_store.list_is_lexically_ordered {
841 let mut peekable = underlying_stream.peekable();
843
844 futures::stream::once(async move {
845 let naming_scheme = match Pin::new(&mut peekable).peek().await {
846 Some(Ok(m)) => m.naming_scheme,
847 Some(Err(_)) => ManifestNamingScheme::V2,
850 None => ManifestNamingScheme::V2,
851 };
852
853 if naming_scheme == ManifestNamingScheme::V2 {
854 Ok(Either::Left(peekable))
856 } else {
857 sort_stream(peekable).await.map(Either::Right)
858 }
859 })
860 .try_flatten()
861 .boxed()
862 } else {
863 futures::stream::once(sort_stream(underlying_stream))
868 .try_flatten()
869 .boxed()
870 }
871 }
872
873 fn list_manifest_locations_since<'a>(
881 &self,
882 base_path: &Path,
883 object_store: &'a ObjectStore,
884 since_version: u64,
885 ) -> BoxStream<'a, Result<ManifestLocation>> {
886 if !uses_version_hint(object_store) {
887 return self
888 .list_manifest_locations(base_path, object_store, true)
889 .try_take_while(move |loc| future::ready(Ok(loc.version > since_version)))
890 .boxed();
891 }
892
893 let base_path = base_path.clone();
894 futures::stream::once(async move {
895 let locations = match list_manifests_since_version_with_hint(
896 object_store,
897 &base_path,
898 since_version,
899 )
900 .await
901 {
902 Some(locations) => locations,
903 None => {
904 let mut locations = list_manifests(&base_path, &object_store.inner)
905 .try_collect::<Vec<_>>()
906 .await?;
907 locations.retain(|loc| loc.version > since_version);
908 locations.sort_by_key(|loc| std::cmp::Reverse(loc.version));
909 locations
910 }
911 };
912 Ok::<_, Error>(futures::stream::iter(locations.into_iter().map(Ok)))
913 })
914 .try_flatten()
915 .boxed()
916 }
917
918 async fn commit(
923 &self,
924 manifest: &mut Manifest,
925 indices: Option<Vec<IndexMetadata>>,
926 base_path: &Path,
927 object_store: &ObjectStore,
928 manifest_writer: ManifestWriter,
929 naming_scheme: ManifestNamingScheme,
930 transaction: Option<Transaction>,
931 ) -> std::result::Result<ManifestLocation, CommitError>;
932
933 async fn delete(&self, _base_path: &Path) -> Result<()> {
935 Ok(())
936 }
937}
938
939async fn default_resolve_version(
940 base_path: &Path,
941 version: u64,
942 object_store: &dyn OSObjectStore,
943) -> Result<ManifestLocation> {
944 if is_detached_version(version) {
945 return Ok(ManifestLocation {
946 version,
947 naming_scheme: ManifestNamingScheme::V2,
950 path: ManifestNamingScheme::V2.manifest_path(base_path, version),
952 size: None,
953 e_tag: None,
954 });
955 }
956
957 let scheme = ManifestNamingScheme::V2;
959 let path = scheme.manifest_path(base_path, version);
960 match object_store.head(&path).await {
961 Ok(meta) => Ok(ManifestLocation {
962 version,
963 path,
964 size: Some(meta.size),
965 naming_scheme: scheme,
966 e_tag: meta.e_tag,
967 }),
968 Err(ObjectStoreError::NotFound { .. }) => {
969 let scheme = ManifestNamingScheme::V1;
971 Ok(ManifestLocation {
972 version,
973 path: scheme.manifest_path(base_path, version),
974 size: None,
975 naming_scheme: scheme,
976 e_tag: None,
977 })
978 }
979 Err(e) => Err(e.into()),
980 }
981}
982#[cfg(feature = "dynamodb")]
984#[derive(Debug)]
985struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
986
987#[cfg(feature = "dynamodb")]
988impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
989 fn provide_credentials<'a>(
990 &'a self,
991 ) -> aws_credential_types::provider::future::ProvideCredentials<'a>
992 where
993 Self: 'a,
994 {
995 aws_credential_types::provider::future::ProvideCredentials::new(async {
996 let creds = self
997 .0
998 .get_credential()
999 .await
1000 .map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
1001 Ok(aws_credential_types::Credentials::new(
1002 &creds.key_id,
1003 &creds.secret_key,
1004 creds.token.clone(),
1005 Some(
1006 SystemTime::now()
1007 .checked_add(Duration::from_secs(
1008 60 * 10, ))
1010 .expect("overflow"),
1011 ),
1012 "",
1013 ))
1014 })
1015 }
1016}
1017
1018#[cfg(feature = "dynamodb")]
1019async fn build_dynamodb_external_store(
1020 table_name: &str,
1021 creds: AwsCredentialProvider,
1022 region: &str,
1023 endpoint: Option<String>,
1024 app_name: &str,
1025) -> Result<Arc<dyn ExternalManifestStore>> {
1026 use super::commit::dynamodb::DynamoDBExternalManifestStore;
1027 use aws_sdk_dynamodb::{
1028 Client,
1029 config::{IdentityCache, Region, retry::RetryConfig},
1030 };
1031
1032 let mut dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
1033 .behavior_version_latest()
1034 .region(Some(Region::new(region.to_string())))
1035 .credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
1036 .identity_cache(IdentityCache::no_cache())
1038 .retry_config(RetryConfig::standard().with_max_attempts(5));
1041
1042 if let Some(endpoint) = endpoint {
1043 dynamodb_config = dynamodb_config.endpoint_url(endpoint);
1044 }
1045 let client = Client::from_conf(dynamodb_config.build());
1046
1047 DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
1048}
1049
1050pub async fn commit_handler_from_url(
1051 url_or_path: &str,
1052 #[allow(unused_variables)] options: &Option<ObjectStoreParams>,
1054) -> Result<Arc<dyn CommitHandler>> {
1055 let local_handler: Arc<dyn CommitHandler> = if cfg!(windows) {
1056 Arc::new(RenameCommitHandler)
1057 } else {
1058 Arc::new(ConditionalPutCommitHandler)
1059 };
1060
1061 let url = match Url::parse(url_or_path) {
1062 Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
1063 return Ok(local_handler);
1065 }
1066 Ok(url) => url,
1067 Err(_) => {
1068 return Ok(local_handler);
1069 }
1070 };
1071
1072 match url.scheme() {
1073 "file" | "file-object-store" => Ok(local_handler),
1074 "s3" | "gs" | "az" | "abfss" | "memory" | "oss" | "cos" | "shared-memory" => {
1075 Ok(Arc::new(ConditionalPutCommitHandler))
1076 }
1077 #[cfg(not(feature = "dynamodb"))]
1078 "s3+ddb" => Err(Error::invalid_input_source(
1079 "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
1080 )),
1081 #[cfg(feature = "dynamodb")]
1082 "s3+ddb" => {
1083 if url.query_pairs().count() != 1 {
1084 return Err(Error::invalid_input_source(
1085 "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
1086 ));
1087 }
1088 let table_name = match url.query_pairs().next() {
1089 Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
1090 if key == DDB_URL_QUERY_KEY =>
1091 {
1092 if table_name.is_empty() {
1093 return Err(Error::invalid_input_source(
1094 "`s3+ddb://` scheme requires non empty dynamodb table name".into(),
1095 ));
1096 }
1097 table_name
1098 }
1099 _ => {
1100 return Err(Error::invalid_input_source(
1101 "`s3+ddb://` scheme and expects exactly one query `ddbTableName`".into(),
1102 ));
1103 }
1104 };
1105 let options = options.clone().unwrap_or_default();
1106 let storage_options_raw =
1107 StorageOptions(options.storage_options().cloned().unwrap_or_default());
1108 let dynamo_endpoint = get_dynamodb_endpoint(&storage_options_raw);
1109 let storage_options = storage_options_raw.as_s3_options();
1110
1111 let region = storage_options.get(&AmazonS3ConfigKey::Region).cloned();
1112
1113 let accessor = options.get_accessor();
1115
1116 let (aws_creds, region) = build_aws_credential(
1117 options.s3_credentials_refresh_offset,
1118 options.aws_credentials.clone(),
1119 Some(&storage_options),
1120 region,
1121 accessor,
1122 )
1123 .await?;
1124
1125 Ok(Arc::new(ExternalManifestCommitHandler {
1126 external_manifest_store: build_dynamodb_external_store(
1127 table_name,
1128 aws_creds.clone(),
1129 ®ion,
1130 dynamo_endpoint,
1131 "lancedb",
1132 )
1133 .await?,
1134 }))
1135 }
1136 _ => Ok(Arc::new(UnsafeCommitHandler)),
1137 }
1138}
1139
1140#[cfg(feature = "dynamodb")]
1141fn get_dynamodb_endpoint(storage_options: &StorageOptions) -> Option<String> {
1142 if let Some(endpoint) = storage_options.0.get("dynamodb_endpoint") {
1143 Some(endpoint.clone())
1144 } else {
1145 std::env::var("DYNAMODB_ENDPOINT").ok()
1146 }
1147}
1148
1149#[derive(Debug)]
1151pub enum CommitError {
1152 CommitConflict,
1154 OtherError(Error),
1156}
1157
1158impl From<Error> for CommitError {
1159 fn from(e: Error) -> Self {
1160 Self::OtherError(e)
1161 }
1162}
1163
1164impl From<CommitError> for Error {
1165 fn from(e: CommitError) -> Self {
1166 match e {
1167 CommitError::CommitConflict => Self::internal("Commit conflict".to_string()),
1168 CommitError::OtherError(e) => e,
1169 }
1170 }
1171}
1172
1173static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
1175
1176pub struct UnsafeCommitHandler;
1180
1181#[async_trait::async_trait]
1182#[allow(clippy::too_many_arguments)]
1183impl CommitHandler for UnsafeCommitHandler {
1184 async fn commit(
1185 &self,
1186 manifest: &mut Manifest,
1187 indices: Option<Vec<IndexMetadata>>,
1188 base_path: &Path,
1189 object_store: &ObjectStore,
1190 manifest_writer: ManifestWriter,
1191 naming_scheme: ManifestNamingScheme,
1192 transaction: Option<Transaction>,
1193 ) -> std::result::Result<ManifestLocation, CommitError> {
1194 if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
1196 WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
1197 log::warn!(
1198 "Using unsafe commit handler. Concurrent writes may result in data loss. \
1199 Consider providing a commit handler that prevents conflicting writes."
1200 );
1201 }
1202
1203 let version_path = naming_scheme.manifest_path(base_path, manifest.version);
1204 let res =
1205 manifest_writer(object_store, manifest, indices, &version_path, transaction).await?;
1206
1207 write_version_hint(object_store, base_path, manifest.version).await;
1208
1209 Ok(ManifestLocation {
1210 version: manifest.version,
1211 size: Some(res.size as u64),
1212 naming_scheme,
1213 path: version_path,
1214 e_tag: res.e_tag,
1215 })
1216 }
1217}
1218
1219impl Debug for UnsafeCommitHandler {
1220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1221 f.debug_struct("UnsafeCommitHandler").finish()
1222 }
1223}
1224
1225#[async_trait::async_trait]
1227pub trait CommitLock: Debug {
1228 type Lease: CommitLease;
1229
1230 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
1243}
1244
1245#[async_trait::async_trait]
1246pub trait CommitLease: Send + Sync {
1247 async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
1249}
1250
1251#[async_trait::async_trait]
1252impl<T: CommitLock + Send + Sync> CommitHandler for T {
1253 async fn commit(
1254 &self,
1255 manifest: &mut Manifest,
1256 indices: Option<Vec<IndexMetadata>>,
1257 base_path: &Path,
1258 object_store: &ObjectStore,
1259 manifest_writer: ManifestWriter,
1260 naming_scheme: ManifestNamingScheme,
1261 transaction: Option<Transaction>,
1262 ) -> std::result::Result<ManifestLocation, CommitError> {
1263 let path = naming_scheme.manifest_path(base_path, manifest.version);
1264 let lease = self.lock(manifest.version).await?;
1267
1268 match object_store.inner.head(&path).await {
1270 Ok(_) => {
1271 lease.release(false).await?;
1274
1275 return Err(CommitError::CommitConflict);
1276 }
1277 Err(ObjectStoreError::NotFound { .. }) => {}
1278 Err(e) => {
1279 lease.release(false).await?;
1282
1283 return Err(CommitError::OtherError(e.into()));
1284 }
1285 }
1286 let res = manifest_writer(object_store, manifest, indices, &path, transaction).await;
1287
1288 lease.release(res.is_ok()).await?;
1290
1291 let res = res?;
1292
1293 write_version_hint(object_store, base_path, manifest.version).await;
1294
1295 Ok(ManifestLocation {
1296 version: manifest.version,
1297 size: Some(res.size as u64),
1298 naming_scheme,
1299 path,
1300 e_tag: res.e_tag,
1301 })
1302 }
1303}
1304
1305#[async_trait::async_trait]
1306impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
1307 async fn commit(
1308 &self,
1309 manifest: &mut Manifest,
1310 indices: Option<Vec<IndexMetadata>>,
1311 base_path: &Path,
1312 object_store: &ObjectStore,
1313 manifest_writer: ManifestWriter,
1314 naming_scheme: ManifestNamingScheme,
1315 transaction: Option<Transaction>,
1316 ) -> std::result::Result<ManifestLocation, CommitError> {
1317 self.as_ref()
1318 .commit(
1319 manifest,
1320 indices,
1321 base_path,
1322 object_store,
1323 manifest_writer,
1324 naming_scheme,
1325 transaction,
1326 )
1327 .await
1328 }
1329}
1330
1331pub struct RenameCommitHandler;
1335
1336#[async_trait::async_trait]
1337impl CommitHandler for RenameCommitHandler {
1338 async fn commit(
1339 &self,
1340 manifest: &mut Manifest,
1341 indices: Option<Vec<IndexMetadata>>,
1342 base_path: &Path,
1343 object_store: &ObjectStore,
1344 manifest_writer: ManifestWriter,
1345 naming_scheme: ManifestNamingScheme,
1346 transaction: Option<Transaction>,
1347 ) -> std::result::Result<ManifestLocation, CommitError> {
1348 let path = naming_scheme.manifest_path(base_path, manifest.version);
1352 let tmp_path = make_staging_manifest_path(&path)?;
1353
1354 let res = manifest_writer(object_store, manifest, indices, &tmp_path, transaction).await?;
1355
1356 match object_store
1357 .inner
1358 .rename_if_not_exists(&tmp_path, &path)
1359 .await
1360 {
1361 Ok(_) => {
1362 write_version_hint(object_store, base_path, manifest.version).await;
1364 Ok(ManifestLocation {
1365 version: manifest.version,
1366 path,
1367 size: Some(res.size as u64),
1368 naming_scheme,
1369 e_tag: None, })
1371 }
1372 Err(ObjectStoreError::AlreadyExists { .. }) => {
1373 let _ = object_store.delete(&tmp_path).await;
1376
1377 return Err(CommitError::CommitConflict);
1378 }
1379 Err(e) => {
1380 return Err(CommitError::OtherError(e.into()));
1382 }
1383 }
1384 }
1385}
1386
1387impl Debug for RenameCommitHandler {
1388 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1389 f.debug_struct("RenameCommitHandler").finish()
1390 }
1391}
1392
1393pub struct ConditionalPutCommitHandler;
1394
1395#[async_trait::async_trait]
1396impl CommitHandler for ConditionalPutCommitHandler {
1397 async fn commit(
1398 &self,
1399 manifest: &mut Manifest,
1400 indices: Option<Vec<IndexMetadata>>,
1401 base_path: &Path,
1402 object_store: &ObjectStore,
1403 manifest_writer: ManifestWriter,
1404 naming_scheme: ManifestNamingScheme,
1405 transaction: Option<Transaction>,
1406 ) -> std::result::Result<ManifestLocation, CommitError> {
1407 let path = naming_scheme.manifest_path(base_path, manifest.version);
1408
1409 let memory_store = ObjectStore::memory();
1410 let dummy_path = "dummy";
1411 manifest_writer(
1412 &memory_store,
1413 manifest,
1414 indices,
1415 &dummy_path.into(),
1416 transaction,
1417 )
1418 .await?;
1419 let dummy_data = memory_store.read_one_all(&dummy_path.into()).await?;
1420 let size = dummy_data.len() as u64;
1421 let res = object_store
1422 .inner
1423 .put_opts(
1424 &path,
1425 dummy_data.into(),
1426 PutOptions {
1427 mode: object_store::PutMode::Create,
1428 ..Default::default()
1429 },
1430 )
1431 .await
1432 .map_err(|err| match err {
1433 ObjectStoreError::AlreadyExists { .. } | ObjectStoreError::Precondition { .. } => {
1434 CommitError::CommitConflict
1435 }
1436 _ => CommitError::OtherError(err.into()),
1437 })?;
1438
1439 write_version_hint(object_store, base_path, manifest.version).await;
1440
1441 Ok(ManifestLocation {
1442 version: manifest.version,
1443 path,
1444 size: Some(size),
1445 naming_scheme,
1446 e_tag: res.e_tag,
1447 })
1448 }
1449}
1450
1451impl Debug for ConditionalPutCommitHandler {
1452 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1453 f.debug_struct("ConditionalPutCommitHandler").finish()
1454 }
1455}
1456
1457#[derive(Debug, Clone)]
1458pub struct CommitConfig {
1459 pub num_retries: u32,
1460 pub skip_auto_cleanup: bool,
1461 }
1463
1464impl Default for CommitConfig {
1465 fn default() -> Self {
1466 Self {
1467 num_retries: 20,
1468 skip_auto_cleanup: false,
1469 }
1470 }
1471}
1472
1473#[cfg(test)]
1474mod tests {
1475 use lance_core::utils::tempfile::TempObjDir;
1476
1477 use super::*;
1478
1479 #[test]
1480 fn test_manifest_naming_scheme() {
1481 let v1 = ManifestNamingScheme::V1;
1482 let v2 = ManifestNamingScheme::V2;
1483
1484 assert_eq!(
1485 v1.manifest_path(&Path::from("base"), 0),
1486 Path::from("base/_versions/0.manifest")
1487 );
1488 assert_eq!(
1489 v1.manifest_path(&Path::from("base"), 42),
1490 Path::from("base/_versions/42.manifest")
1491 );
1492
1493 assert_eq!(
1494 v2.manifest_path(&Path::from("base"), 0),
1495 Path::from("base/_versions/18446744073709551615.manifest")
1496 );
1497 assert_eq!(
1498 v2.manifest_path(&Path::from("base"), 42),
1499 Path::from("base/_versions/18446744073709551573.manifest")
1500 );
1501
1502 assert_eq!(v1.parse_version("0.manifest"), Some(0));
1503 assert_eq!(v1.parse_version("42.manifest"), Some(42));
1504 assert_eq!(
1505 v1.parse_version("42.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1506 Some(42)
1507 );
1508
1509 assert_eq!(v2.parse_version("18446744073709551615.manifest"), Some(0));
1510 assert_eq!(v2.parse_version("18446744073709551573.manifest"), Some(42));
1511 assert_eq!(
1512 v2.parse_version("18446744073709551573.manifest-cee4fbbb-eb19-4ea3-8ca7-54f5ec33dedc"),
1513 Some(42)
1514 );
1515
1516 assert_eq!(ManifestNamingScheme::detect_scheme("0.manifest"), Some(v1));
1517 assert_eq!(
1518 ManifestNamingScheme::detect_scheme("18446744073709551615.manifest"),
1519 Some(v2)
1520 );
1521 assert_eq!(ManifestNamingScheme::detect_scheme("something else"), None);
1522 }
1523
1524 #[tokio::test]
1525 async fn test_manifest_naming_migration() {
1526 let object_store = ObjectStore::memory();
1527 let base = Path::from("base");
1528 let versions_dir = base.clone().join(VERSIONS_DIR);
1529
1530 let original_files = vec![
1532 versions_dir.clone().join("irrelevant"),
1533 ManifestNamingScheme::V1.manifest_path(&base, 0),
1534 ManifestNamingScheme::V2.manifest_path(&base, 1),
1535 ];
1536 for path in original_files {
1537 object_store.put(&path, b"".as_slice()).await.unwrap();
1538 }
1539
1540 migrate_scheme_to_v2(&object_store, &base).await.unwrap();
1541
1542 let expected_files = vec![
1543 ManifestNamingScheme::V2.manifest_path(&base, 1),
1544 ManifestNamingScheme::V2.manifest_path(&base, 0),
1545 versions_dir.clone().join("irrelevant"),
1546 ];
1547 let actual_files = object_store
1548 .inner
1549 .list(Some(&versions_dir))
1550 .map_ok(|res| res.location)
1551 .try_collect::<Vec<_>>()
1552 .await
1553 .unwrap();
1554 assert_eq!(actual_files, expected_files);
1555 }
1556
1557 #[tokio::test]
1558 #[rstest::rstest]
1559 async fn test_list_manifests_sorted(
1560 #[values(true, false)] lexical_list_store: bool,
1561 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1562 naming_scheme: ManifestNamingScheme,
1563 ) {
1564 let tempdir;
1565 let (object_store, base) = if lexical_list_store {
1566 (Box::new(ObjectStore::memory()), Path::from("base"))
1567 } else {
1568 tempdir = TempObjDir::default();
1569 let path = tempdir.clone().join("base");
1570 let store = Box::new(ObjectStore::local());
1571 assert!(!store.list_is_lexically_ordered);
1572 (store, path)
1573 };
1574
1575 let mut expected_paths = Vec::new();
1577 for i in (0..12).rev() {
1578 let path = naming_scheme.manifest_path(&base, i);
1579 object_store.put(&path, b"".as_slice()).await.unwrap();
1580 expected_paths.push(path);
1581 }
1582
1583 let actual_versions = ConditionalPutCommitHandler
1584 .list_manifest_locations(&base, &object_store, true)
1585 .map_ok(|location| location.path)
1586 .try_collect::<Vec<_>>()
1587 .await
1588 .unwrap();
1589
1590 assert_eq!(actual_versions, expected_paths);
1591 }
1592
1593 #[tokio::test]
1594 #[rstest::rstest]
1595 async fn test_current_manifest_path(
1596 #[values(true, false)] lexical_list_store: bool,
1597 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1598 naming_scheme: ManifestNamingScheme,
1599 ) {
1600 let mut object_store = ObjectStore::memory();
1603 object_store.list_is_lexically_ordered = lexical_list_store;
1604 let object_store = Box::new(object_store);
1605 let base = Path::from("base");
1606
1607 for version in [5, 2, 11, 0, 8, 3, 10, 1, 7, 4, 9, 6] {
1609 let path = naming_scheme.manifest_path(&base, version);
1610 object_store.put(&path, b"".as_slice()).await.unwrap();
1611 }
1612
1613 let location = current_manifest_path(&object_store, &base).await.unwrap();
1614
1615 assert_eq!(location.version, 11);
1616 assert_eq!(location.naming_scheme, naming_scheme);
1617 assert_eq!(location.path, naming_scheme.manifest_path(&base, 11));
1618 }
1619
1620 fn non_lexical_memory_store() -> Box<ObjectStore> {
1623 let mut object_store = ObjectStore::memory();
1624 object_store.list_is_lexically_ordered = false;
1625 Box::new(object_store)
1626 }
1627
1628 #[tokio::test]
1629 async fn test_write_version_hint() {
1630 let base = Path::from("base");
1631
1632 let lexical = ObjectStore::memory();
1634 write_version_hint(&lexical, &base, 42).await;
1635 assert_eq!(read_version_from_hint(&lexical, &base).await, None);
1636
1637 let object_store = non_lexical_memory_store();
1638 write_version_hint(&object_store, &base, 42).await;
1639 assert_eq!(read_version_from_hint(&object_store, &base).await, Some(42));
1640
1641 write_version_hint(&object_store, &base, 100).await;
1643 assert_eq!(
1644 read_version_from_hint(&object_store, &base).await,
1645 Some(100)
1646 );
1647
1648 write_version_hint(
1650 &object_store,
1651 &base,
1652 crate::format::DETACHED_VERSION_MASK | 7,
1653 )
1654 .await;
1655 assert_eq!(
1656 read_version_from_hint(&object_store, &base).await,
1657 Some(100)
1658 );
1659
1660 let hint_path = version_hint_path(&base);
1662 object_store
1663 .put(&hint_path, b"not json".as_slice())
1664 .await
1665 .unwrap();
1666 assert_eq!(read_version_from_hint(&object_store, &base).await, None);
1667 }
1668
1669 #[tokio::test]
1670 #[rstest::rstest]
1671 async fn test_read_version_hint_and_probe(
1672 #[values(ManifestNamingScheme::V1, ManifestNamingScheme::V2)]
1673 naming_scheme: ManifestNamingScheme,
1674 ) {
1675 let object_store = non_lexical_memory_store();
1676 let base = Path::from("base");
1677
1678 assert!(
1680 read_version_hint_and_probe(&object_store, &base)
1681 .await
1682 .is_none()
1683 );
1684
1685 for version in 1..=5 {
1686 object_store
1687 .put(&naming_scheme.manifest_path(&base, version), b"".as_slice())
1688 .await
1689 .unwrap();
1690 }
1691
1692 write_version_hint(&object_store, &base, 3).await;
1694 let location = read_version_hint_and_probe(&object_store, &base)
1695 .await
1696 .unwrap();
1697 assert_eq!(location.version, 5);
1698 assert_eq!(location.naming_scheme, naming_scheme);
1699
1700 write_version_hint(&object_store, &base, 5).await;
1702 let location = read_version_hint_and_probe(&object_store, &base)
1703 .await
1704 .unwrap();
1705 assert_eq!(location.version, 5);
1706
1707 write_version_hint(&object_store, &base, 10).await;
1709 assert!(
1710 read_version_hint_and_probe(&object_store, &base)
1711 .await
1712 .is_none()
1713 );
1714 }
1715
1716 #[tokio::test]
1717 async fn test_list_manifests_since_version_with_hint() {
1718 let object_store = non_lexical_memory_store();
1719 let base = Path::from("base");
1720 let scheme = ManifestNamingScheme::V2;
1721
1722 for version in 1..=10 {
1723 object_store
1724 .put(&scheme.manifest_path(&base, version), b"".as_slice())
1725 .await
1726 .unwrap();
1727 }
1728
1729 assert!(
1731 list_manifests_since_version_with_hint(&object_store, &base, 7)
1732 .await
1733 .is_none()
1734 );
1735
1736 write_version_hint(&object_store, &base, 10).await;
1738 assert!(matches!(
1739 list_manifests_since_version_with_hint(&object_store, &base, 10).await,
1740 Some(v) if v.is_empty()
1741 ));
1742
1743 let locations = list_manifests_since_version_with_hint(&object_store, &base, 7)
1746 .await
1747 .unwrap();
1748 assert_eq!(
1749 locations.iter().map(|l| l.version).collect::<Vec<_>>(),
1750 vec![10, 9, 8]
1751 );
1752
1753 write_version_hint(&object_store, &base, 8).await;
1755 let locations = list_manifests_since_version_with_hint(&object_store, &base, 7)
1756 .await
1757 .unwrap();
1758 assert_eq!(
1759 locations.iter().map(|l| l.version).collect::<Vec<_>>(),
1760 vec![10, 9, 8]
1761 );
1762
1763 write_version_hint(&object_store, &base, 20).await;
1765 assert!(
1766 list_manifests_since_version_with_hint(&object_store, &base, 7)
1767 .await
1768 .is_none()
1769 );
1770 }
1771
1772 #[tokio::test]
1773 async fn test_current_manifest_path_with_hint_non_lexical() {
1774 let object_store = non_lexical_memory_store();
1776 let base = Path::from("base");
1777 let naming_scheme = ManifestNamingScheme::V2;
1778
1779 for version in 1..=100 {
1780 object_store
1781 .put(&naming_scheme.manifest_path(&base, version), b"".as_slice())
1782 .await
1783 .unwrap();
1784 }
1785
1786 write_version_hint(&object_store, &base, 98).await;
1788 let location = current_manifest_path(&object_store, &base).await.unwrap();
1789 assert_eq!(location.version, 100);
1790 }
1791
1792 #[tokio::test]
1793 async fn test_current_manifest_path_with_stale_hint_falls_back_to_listing() {
1794 let object_store = non_lexical_memory_store();
1795 let base = Path::from("base");
1796 let naming_scheme = ManifestNamingScheme::V2;
1797
1798 object_store
1800 .put(&naming_scheme.manifest_path(&base, 5), b"".as_slice())
1801 .await
1802 .unwrap();
1803 write_version_hint(&object_store, &base, 10).await;
1804
1805 let location = current_manifest_path(&object_store, &base).await.unwrap();
1807 assert_eq!(location.version, 5);
1808 }
1809
1810 #[test]
1811 fn test_parse_detached_version() {
1812 assert_eq!(
1814 ManifestNamingScheme::parse_detached_version("d12345.manifest"),
1815 Some(12345)
1816 );
1817 assert_eq!(
1818 ManifestNamingScheme::parse_detached_version("d9223372036854775808.manifest"),
1819 Some(9223372036854775808)
1820 );
1821
1822 assert_eq!(
1824 ManifestNamingScheme::parse_detached_version("12345.manifest"),
1825 None
1826 );
1827
1828 assert_eq!(
1830 ManifestNamingScheme::parse_detached_version("18446744073709551615.manifest"),
1831 None
1832 );
1833
1834 assert_eq!(ManifestNamingScheme::parse_detached_version("d12345"), None);
1836 }
1837
1838 #[tokio::test]
1839 async fn test_list_detached_manifests() {
1840 use crate::format::DETACHED_VERSION_MASK;
1841 use futures::TryStreamExt;
1842
1843 let object_store = ObjectStore::memory();
1844 let base = Path::from("base");
1845 let versions_dir = base.clone().join(VERSIONS_DIR);
1846
1847 for version in [1, 2, 3] {
1849 let path = ManifestNamingScheme::V2.manifest_path(&base, version);
1850 object_store.put(&path, b"".as_slice()).await.unwrap();
1851 }
1852
1853 let detached_versions: Vec<u64> = vec![
1855 100 | DETACHED_VERSION_MASK,
1856 200 | DETACHED_VERSION_MASK,
1857 300 | DETACHED_VERSION_MASK,
1858 ];
1859 for version in &detached_versions {
1860 let path = versions_dir.clone().join(format!("d{}.manifest", version));
1861 object_store.put(&path, b"".as_slice()).await.unwrap();
1862 }
1863
1864 let detached_locations: Vec<ManifestLocation> =
1866 list_detached_manifests(&base, &object_store.inner)
1867 .try_collect()
1868 .await
1869 .unwrap();
1870
1871 assert_eq!(detached_locations.len(), 3);
1872 for loc in &detached_locations {
1873 assert_eq!(loc.naming_scheme, ManifestNamingScheme::V2);
1874 }
1875
1876 let mut found_versions: Vec<u64> = detached_locations.iter().map(|l| l.version).collect();
1877 found_versions.sort();
1878 let mut expected_versions = detached_versions.clone();
1879 expected_versions.sort();
1880 assert_eq!(found_versions, expected_versions);
1881 }
1882
1883 #[tokio::test]
1884 async fn test_commit_handler_from_url_memory_schemes() {
1885 for url in ["memory://bucket-a/ds", "shared-memory://bucket-a/ds"] {
1890 let handler = commit_handler_from_url(url, &None).await.unwrap();
1891 assert_eq!(
1892 format!("{:?}", handler),
1893 "ConditionalPutCommitHandler",
1894 "{url} should route to ConditionalPutCommitHandler",
1895 );
1896 }
1897 }
1898}