use std::fmt::Debug;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use aws_credential_types::provider::error::CredentialsError;
use aws_credential_types::provider::ProvideCredentials;
use futures::{
future::{self, BoxFuture},
stream::BoxStream,
StreamExt, TryStreamExt,
};
use object_store::aws::AwsCredentialProvider;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use snafu::{location, Location};
use url::Url;
#[cfg(feature = "dynamodb")]
pub mod dynamodb;
pub mod external_manifest;
use lance_core::{Error, Result};
use lance_io::object_store::ObjectStoreExt;
use lance_io::object_store::ObjectStoreParams;
#[cfg(feature = "dynamodb")]
use {
self::external_manifest::{ExternalManifestCommitHandler, ExternalManifestStore},
aws_credential_types::cache::CredentialsCache,
lance_io::object_store::{build_aws_credential, StorageOptions},
object_store::aws::AmazonS3ConfigKey,
std::borrow::Cow,
};
use crate::format::{Index, Manifest};
const LATEST_MANIFEST_NAME: &str = "_latest.manifest";
const VERSIONS_DIR: &str = "_versions";
const MANIFEST_EXTENSION: &str = "manifest";
pub type ManifestWriter = for<'a> fn(
object_store: &'a dyn ObjectStore,
manifest: &'a mut Manifest,
indices: Option<Vec<Index>>,
path: &'a Path,
) -> BoxFuture<'a, Result<()>>;
pub fn manifest_path(base: &Path, version: u64) -> Path {
base.child(VERSIONS_DIR)
.child(format!("{version}.{MANIFEST_EXTENSION}"))
}
pub fn latest_manifest_path(base: &Path) -> Path {
base.child(LATEST_MANIFEST_NAME)
}
async fn current_manifest_path(object_store: &dyn ObjectStore, base: &Path) -> Result<Path> {
let manifest_files = object_store
.list_with_delimiter(Some(&base.child(VERSIONS_DIR)))
.await?;
let current = manifest_files
.objects
.into_iter()
.map(|meta| meta.location)
.filter(|path| {
path.filename().is_some() && path.filename().unwrap().ends_with(MANIFEST_EXTENSION)
})
.filter_map(|path| {
let version = path
.filename()
.unwrap()
.split_once('.')
.and_then(|(version_str, _)| version_str.parse::<u64>().ok())?;
Some((version, path))
})
.max_by_key(|(version, _)| *version)
.map(|(_, path)| path);
if let Some(path) = current {
Ok(path)
} else {
Err(Error::NotFound {
uri: manifest_path(base, 1).to_string(),
location: location!(),
})
}
}
async fn list_manifests<'a>(
base_path: &Path,
object_store: &'a dyn ObjectStore,
) -> Result<BoxStream<'a, Result<Path>>> {
let base_path = base_path.clone();
Ok(object_store
.read_dir_all(&base_path.child(VERSIONS_DIR), None)
.await?
.try_filter_map(|obj_meta| {
if obj_meta.location.extension() == Some("manifest") {
future::ready(Ok(Some(obj_meta.location)))
} else {
future::ready(Ok(None))
}
})
.boxed())
}
pub fn parse_version_from_path(path: &Path) -> Result<u64> {
path.filename()
.and_then(|name| name.split_once('.'))
.filter(|(_, extension)| *extension == "manifest")
.and_then(|(version, _)| version.parse::<u64>().ok())
.ok_or(Error::Internal {
message: format!("Expected manifest file, but found {}", path),
location: location!(),
})
}
fn make_staging_manifest_path(base: &Path) -> Result<Path> {
let id = uuid::Uuid::new_v4().to_string();
Path::parse(format!("{base}-{id}")).map_err(|e| Error::IO {
message: format!("failed to parse path: {}", e),
location: location!(),
})
}
async fn write_latest_manifest(
from_path: &Path,
base_path: &Path,
object_store: &dyn ObjectStore,
) -> Result<()> {
let latest_path = latest_manifest_path(base_path);
let staging_path = make_staging_manifest_path(from_path)?;
object_store
.copy(from_path, &staging_path)
.await
.map_err(|err| CommitError::OtherError(err.into()))?;
object_store.rename(&staging_path, &latest_path).await?;
Ok(())
}
#[cfg(feature = "dynamodb")]
const DDB_URL_QUERY_KEY: &str = "ddbTableName";
#[async_trait::async_trait]
pub trait CommitHandler: Debug + Send + Sync {
async fn resolve_latest_version(
&self,
base_path: &Path,
object_store: &dyn ObjectStore,
) -> std::result::Result<Path, Error> {
Ok(current_manifest_path(object_store, base_path).await?)
}
async fn resolve_latest_version_id(
&self,
base_path: &Path,
object_store: &dyn ObjectStore,
) -> Result<u64> {
let path = self.resolve_latest_version(base_path, object_store).await?;
parse_version_from_path(&path)
}
async fn resolve_version(
&self,
base_path: &Path,
version: u64,
_object_store: &dyn ObjectStore,
) -> std::result::Result<Path, Error> {
Ok(manifest_path(base_path, version))
}
async fn list_manifests<'a>(
&self,
base_path: &Path,
object_store: &'a dyn ObjectStore,
) -> Result<BoxStream<'a, Result<Path>>> {
list_manifests(base_path, object_store).await
}
async fn commit(
&self,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
base_path: &Path,
object_store: &dyn ObjectStore,
manifest_writer: ManifestWriter,
) -> std::result::Result<(), CommitError>;
}
#[derive(Debug)]
struct OSObjectStoreToAwsCredAdaptor(AwsCredentialProvider);
impl ProvideCredentials for OSObjectStoreToAwsCredAdaptor {
fn provide_credentials<'a>(
&'a self,
) -> aws_credential_types::provider::future::ProvideCredentials<'a>
where
Self: 'a,
{
aws_credential_types::provider::future::ProvideCredentials::new(async {
let creds = self
.0
.get_credential()
.await
.map_err(|e| CredentialsError::provider_error(Box::new(e)))?;
Ok(aws_credential_types::Credentials::new(
&creds.key_id,
&creds.secret_key,
creds.token.clone(),
Some(
SystemTime::now()
.checked_add(Duration::from_secs(
60 * 10, ))
.expect("overflow"),
),
"",
))
})
}
}
#[cfg(feature = "dynamodb")]
async fn build_dynamodb_external_store(
table_name: &str,
creds: AwsCredentialProvider,
region: &str,
app_name: &str,
) -> Result<Arc<dyn ExternalManifestStore>> {
use std::env;
use super::commit::dynamodb::DynamoDBExternalManifestStore;
use aws_sdk_dynamodb::{config::Region, Client};
let dynamodb_config = aws_sdk_dynamodb::config::Builder::new()
.region(Some(Region::new(region.to_string())))
.credentials_provider(OSObjectStoreToAwsCredAdaptor(creds))
.credentials_cache(CredentialsCache::no_caching());
let dynamodb_config = match env::var("DYNAMODB_ENDPOINT") {
Ok(endpoint) => dynamodb_config.endpoint_url(endpoint),
_ => dynamodb_config,
};
let client = Client::from_conf(dynamodb_config.build());
DynamoDBExternalManifestStore::new_external_store(client.into(), table_name, app_name).await
}
pub async fn commit_handler_from_url(
url_or_path: &str,
#[allow(unused_variables)] options: &Option<ObjectStoreParams>,
) -> Result<Arc<dyn CommitHandler>> {
let url = match Url::parse(url_or_path) {
Ok(url) if url.scheme().len() == 1 && cfg!(windows) => {
return Ok(Arc::new(RenameCommitHandler));
}
Ok(url) => url,
Err(_) => {
return Ok(Arc::new(RenameCommitHandler));
}
};
match url.scheme() {
"s3" => Ok(Arc::new(UnsafeCommitHandler)),
#[cfg(not(feature = "dynamodb"))]
"s3+ddb" => Err(Error::InvalidInput {
source: "`s3+ddb://` scheme requires `dynamodb` feature to be enabled".into(),
location: location!(),
}),
#[cfg(feature = "dynamodb")]
"s3+ddb" => {
if url.query_pairs().count() != 1 {
return Err(Error::InvalidInput {
source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
.into(),
location: location!(),
});
}
let table_name = match url.query_pairs().next() {
Some((Cow::Borrowed(key), Cow::Borrowed(table_name)))
if key == DDB_URL_QUERY_KEY =>
{
if table_name.is_empty() {
return Err(Error::InvalidInput {
source: "`s3+ddb://` scheme requires non empty dynamodb table name"
.into(),
location: location!(),
});
}
table_name
}
_ => {
return Err(Error::InvalidInput {
source: "`s3+ddb://` scheme and expects exactly one query `ddbTableName`"
.into(),
location: location!(),
});
}
};
let options = options.clone().unwrap_or_default();
let storage_options =
StorageOptions(options.storage_options.unwrap_or_default()).as_s3_options();
let region = storage_options
.get(&AmazonS3ConfigKey::Region)
.map(|s| s.to_string());
let (aws_creds, region) = build_aws_credential(
options.s3_credentials_refresh_offset,
options.aws_credentials.clone(),
region,
)
.await?;
Ok(Arc::new(ExternalManifestCommitHandler {
external_manifest_store: build_dynamodb_external_store(
table_name,
aws_creds.clone(),
®ion,
"lancedb",
)
.await?,
}))
}
"gs" | "az" | "file" | "memory" => Ok(Arc::new(RenameCommitHandler)),
unknown_scheme => Err(Error::IO {
message: format!("Unsupported URI scheme: {}", unknown_scheme),
location: location!(),
}),
}
}
#[derive(Debug)]
pub enum CommitError {
CommitConflict,
OtherError(Error),
}
impl From<Error> for CommitError {
fn from(e: Error) -> Self {
Self::OtherError(e)
}
}
impl From<CommitError> for Error {
fn from(e: CommitError) -> Self {
match e {
CommitError::CommitConflict => Self::Internal {
message: "Commit conflict".to_string(),
location: location!(),
},
CommitError::OtherError(e) => e,
}
}
}
static WARNED_ON_UNSAFE_COMMIT: AtomicBool = AtomicBool::new(false);
pub struct UnsafeCommitHandler;
#[async_trait::async_trait]
impl CommitHandler for UnsafeCommitHandler {
async fn commit(
&self,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
base_path: &Path,
object_store: &dyn ObjectStore,
manifest_writer: ManifestWriter,
) -> std::result::Result<(), CommitError> {
if !WARNED_ON_UNSAFE_COMMIT.load(std::sync::atomic::Ordering::Relaxed) {
WARNED_ON_UNSAFE_COMMIT.store(true, std::sync::atomic::Ordering::Relaxed);
log::warn!(
"Using unsafe commit handler. Concurrent writes may result in data loss. \
Consider providing a commit handler that prevents conflicting writes."
);
}
let version_path = self
.resolve_version(base_path, manifest.version, object_store)
.await?;
manifest_writer(object_store, manifest, indices, &version_path).await?;
write_latest_manifest(&version_path, base_path, object_store).await?;
Ok(())
}
}
impl Debug for UnsafeCommitHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("UnsafeCommitHandler").finish()
}
}
#[async_trait::async_trait]
pub trait CommitLock: Debug {
type Lease: CommitLease;
async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError>;
}
#[async_trait::async_trait]
pub trait CommitLease: Send + Sync {
async fn release(&self, success: bool) -> std::result::Result<(), CommitError>;
}
#[async_trait::async_trait]
impl<T: CommitLock + Send + Sync> CommitHandler for T {
async fn commit(
&self,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
base_path: &Path,
object_store: &dyn ObjectStore,
manifest_writer: ManifestWriter,
) -> std::result::Result<(), CommitError> {
let path = self
.resolve_version(base_path, manifest.version, object_store)
.await?;
let lease = self.lock(manifest.version).await?;
match object_store.head(&path).await {
Ok(_) => {
lease.release(false).await?;
return Err(CommitError::CommitConflict);
}
Err(ObjectStoreError::NotFound { .. }) => {}
Err(e) => {
lease.release(false).await?;
return Err(CommitError::OtherError(e.into()));
}
}
let res = manifest_writer(object_store, manifest, indices, &path).await;
write_latest_manifest(&path, base_path, object_store).await?;
lease.release(res.is_ok()).await?;
res.map_err(|err| err.into())
}
}
#[async_trait::async_trait]
impl<T: CommitLock + Send + Sync> CommitHandler for Arc<T> {
async fn commit(
&self,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
base_path: &Path,
object_store: &dyn ObjectStore,
manifest_writer: ManifestWriter,
) -> std::result::Result<(), CommitError> {
self.as_ref()
.commit(manifest, indices, base_path, object_store, manifest_writer)
.await
}
}
pub struct RenameCommitHandler;
#[async_trait::async_trait]
impl CommitHandler for RenameCommitHandler {
async fn commit(
&self,
manifest: &mut Manifest,
indices: Option<Vec<Index>>,
base_path: &Path,
object_store: &dyn ObjectStore,
manifest_writer: ManifestWriter,
) -> std::result::Result<(), CommitError> {
let path = self
.resolve_version(base_path, manifest.version, object_store)
.await?;
let mut parts: Vec<_> = path.parts().collect();
let uuid = uuid::Uuid::new_v4();
let new_name = format!(
".tmp_{}_{}",
parts.last().unwrap().as_ref(),
uuid.as_hyphenated()
);
let _ = std::mem::replace(parts.last_mut().unwrap(), new_name.into());
let tmp_path: Path = parts.into_iter().collect();
manifest_writer(object_store, manifest, indices, &tmp_path).await?;
let res = match object_store.rename_if_not_exists(&tmp_path, &path).await {
Ok(_) => Ok(()),
Err(ObjectStoreError::AlreadyExists { .. }) => {
let _ = object_store.delete(&tmp_path).await;
return Err(CommitError::CommitConflict);
}
Err(e) => {
return Err(CommitError::OtherError(e.into()));
}
};
write_latest_manifest(&path, base_path, object_store).await?;
res
}
}
impl Debug for RenameCommitHandler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RenameCommitHandler").finish()
}
}
#[derive(Debug, Clone)]
pub struct CommitConfig {
pub num_retries: u32,
}
impl Default for CommitConfig {
fn default() -> Self {
Self { num_retries: 5 }
}
}