use crate::checkpoint::{Checkpoint, CheckpointCreateResult};
use crate::config::{CheckpointOptions, GarbageCollectorOptions};
use crate::error::SlateDBError;
use crate::garbage_collector::GarbageCollector;
use crate::manifest::store::{ManifestStore, StoredManifest};
use crate::sst::SsTableFormat;
use crate::stats::StatRegistry;
use crate::tablestore::TableStore;
use crate::clone;
use fail_parallel::FailPointRegistry;
use futures::{StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectStore;
use std::env;
use std::error::Error;
use std::ops::RangeBounds;
use std::sync::Arc;
use tokio::runtime::Handle;
use uuid::Uuid;
pub async fn read_manifest(
path: &Path,
object_store: Arc<dyn ObjectStore>,
maybe_id: Option<u64>,
) -> Result<Option<String>, Box<dyn Error>> {
let manifest_store = ManifestStore::new(path, object_store);
let id_manifest = if let Some(id) = maybe_id {
manifest_store
.try_read_manifest(id)
.await?
.map(|manifest| (id, manifest))
} else {
manifest_store.try_read_latest_manifest().await?
};
match id_manifest {
None => Ok(None),
Some(result) => Ok(Some(serde_json::to_string(&result)?)),
}
}
pub async fn list_manifests<R: RangeBounds<u64>>(
path: &Path,
object_store: Arc<dyn ObjectStore>,
range: R,
) -> Result<String, Box<dyn Error>> {
let manifest_store = ManifestStore::new(path, object_store);
let manifests = manifest_store.list_manifests(range).await?;
Ok(serde_json::to_string(&manifests)?)
}
pub async fn list_checkpoints(
path: &Path,
object_store: Arc<dyn ObjectStore>,
) -> Result<Vec<Checkpoint>, Box<dyn Error>> {
let manifest_store = ManifestStore::new(path, object_store);
let (_, manifest) = manifest_store.read_latest_manifest().await?;
Ok(manifest.core.checkpoints)
}
pub async fn delete_objects_with_prefix(
object_store: Arc<dyn ObjectStore>,
maybe_prefix: Option<&Path>,
) -> Result<(), Box<dyn Error>> {
let stream = object_store
.list(maybe_prefix)
.map_ok(|m| m.location)
.boxed();
object_store
.delete_stream(stream)
.try_collect::<Vec<Path>>()
.await
.map(|_| ())
.map_err(|e| e.into())
}
pub fn load_object_store_from_env(
env_file: Option<String>,
) -> Result<Arc<dyn ObjectStore>, Box<dyn Error>> {
dotenvy::from_filename(env_file.unwrap_or(String::from(".env"))).ok();
let provider = &*env::var("CLOUD_PROVIDER")
.expect("CLOUD_PROVIDER must be set")
.to_lowercase();
match provider {
"local" => load_local(),
#[cfg(feature = "aws")]
"aws" => load_aws(),
#[cfg(feature = "azure")]
"azure" => load_azure(),
_ => Err(format!("Unknown CLOUD_PROVIDER: '{}'", provider).into()),
}
}
pub async fn run_gc_instance(
path: &Path,
object_store: Arc<dyn ObjectStore>,
gc_opts: GarbageCollectorOptions,
) -> Result<(), Box<dyn Error>> {
let manifest_store = Arc::new(ManifestStore::new(path, object_store.clone()));
let sst_format = SsTableFormat::default(); let table_store = Arc::new(TableStore::new(
object_store.clone(),
sst_format.clone(),
path.clone(),
None, ));
let tokio_handle = Handle::current();
let stats = Arc::new(StatRegistry::new());
let collector = GarbageCollector::new(
manifest_store,
table_store,
gc_opts,
tokio_handle,
stats,
|_| {},
)
.await;
collector.register_interrupt_handler();
collector.await_shutdown().await;
Ok(())
}
pub fn load_local() -> Result<Arc<dyn ObjectStore>, Box<dyn Error>> {
let local_path = env::var("LOCAL_PATH").expect("LOCAL_PATH must be set");
let lfs = object_store::local::LocalFileSystem::new_with_prefix(local_path)?;
Ok(Arc::new(lfs) as Arc<dyn ObjectStore>)
}
#[cfg(feature = "aws")]
pub fn load_aws() -> Result<Arc<dyn ObjectStore>, Box<dyn Error>> {
use object_store::aws::{DynamoCommit, S3ConditionalPut};
let key = env::var("AWS_ACCESS_KEY_ID").expect("AWS_ACCESS_KEY_ID must be set");
let secret =
env::var("AWS_SECRET_ACCESS_KEY").expect("Expected AWS_SECRET_ACCESS_KEY must be set");
let session_token = env::var("AWS_SESSION_TOKEN").ok();
let bucket = env::var("AWS_BUCKET").expect("AWS_BUCKET must be set");
let region = env::var("AWS_REGION").expect("AWS_REGION must be set");
let endpoint = env::var("AWS_ENDPOINT").ok();
let dynamodb_table = env::var("AWS_DYNAMODB_TABLE").ok();
let builder = object_store::aws::AmazonS3Builder::new()
.with_access_key_id(key)
.with_secret_access_key(secret)
.with_bucket_name(bucket)
.with_region(region);
let builder = if let Some(token) = session_token {
builder.with_token(token)
} else {
builder
};
let builder = if let Some(dynamodb_table) = dynamodb_table {
builder.with_conditional_put(S3ConditionalPut::Dynamo(DynamoCommit::new(dynamodb_table)))
} else {
builder.with_conditional_put(S3ConditionalPut::ETagMatch)
};
let builder = if let Some(endpoint) = endpoint {
builder.with_allow_http(true).with_endpoint(endpoint)
} else {
builder
};
Ok(Arc::new(builder.build()?) as Arc<dyn ObjectStore>)
}
#[cfg(feature = "azure")]
pub fn load_azure() -> Result<Arc<dyn ObjectStore>, Box<dyn Error>> {
let account = env::var("AZURE_ACCOUNT").expect("AZURE_ACCOUNT must be set");
let key = env::var("AZURE_KEY").expect("AZURE_KEY must be set");
let container = env::var("AZURE_CONTAINER").expect("AZURE_CONTAINER must be set");
let builder = object_store::azure::MicrosoftAzureBuilder::new()
.with_account(account)
.with_access_key(key)
.with_container_name(container);
Ok(Arc::new(builder.build()?) as Arc<dyn ObjectStore>)
}
pub async fn create_checkpoint<P: Into<Path>>(
path: P,
object_store: Arc<dyn ObjectStore>,
options: &CheckpointOptions,
) -> Result<CheckpointCreateResult, SlateDBError> {
let manifest_store = Arc::new(ManifestStore::new(&path.into(), object_store));
let mut stored_manifest = StoredManifest::load(manifest_store).await?;
let checkpoint = stored_manifest.write_checkpoint(None, options).await?;
Ok(CheckpointCreateResult {
id: checkpoint.id,
manifest_id: checkpoint.manifest_id,
})
}
pub async fn create_clone<P: Into<Path>>(
clone_path: P,
parent_path: P,
object_store: Arc<dyn ObjectStore>,
parent_checkpoint: Option<Uuid>,
) -> Result<(), Box<dyn Error>> {
clone::create_clone(
clone_path,
parent_path,
object_store,
parent_checkpoint,
Arc::new(FailPointRegistry::new()),
)
.await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use object_store::{memory::InMemory, path::Path};
#[tokio::test]
async fn test_delete_objects_with_prefix_empty() {
let store = Arc::new(InMemory::new());
let result = delete_objects_with_prefix(store, Some(&Path::from("test/prefix"))).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_delete_objects_with_prefix_single_object() {
let store = Arc::new(InMemory::new());
store
.put(&Path::from("test/prefix/object1"), vec![1, 2, 3].into())
.await
.unwrap();
let result =
delete_objects_with_prefix(store.clone(), Some(&Path::from("test/prefix"))).await;
assert!(result.is_ok());
let list_result = store
.list(Some(&Path::from("test/prefix")))
.collect::<Vec<_>>()
.await;
assert!(list_result.is_empty());
}
#[tokio::test]
async fn test_delete_objects_with_prefix_multiple_objects() {
let store = Arc::new(InMemory::new());
store
.put(&Path::from("test/prefix/object1"), vec![1, 2, 3].into())
.await
.unwrap();
store
.put(&Path::from("test/prefix/object2"), vec![4, 5, 6].into())
.await
.unwrap();
store
.put(&Path::from("test/other/object3"), vec![7, 8, 9].into())
.await
.unwrap();
let result =
delete_objects_with_prefix(store.clone(), Some(&Path::from("test/prefix"))).await;
assert!(result.is_ok());
let list_result = store.list(None).collect::<Vec<_>>().await;
assert_eq!(list_result.len(), 1);
assert_eq!(
list_result[0].as_ref().unwrap().location,
Path::from("test/other/object3")
);
}
#[tokio::test]
async fn test_delete_objects_with_empty_prefix() {
let store = Arc::new(InMemory::new());
store
.put(&Path::from("test/prefix/object1"), vec![1, 2, 3].into())
.await
.unwrap();
store
.put(&Path::from("other/path/object2"), vec![4, 5, 6].into())
.await
.unwrap();
store
.put(&Path::from("root_object"), vec![7, 8, 9].into())
.await
.unwrap();
let result = delete_objects_with_prefix(store.clone(), Some(&Path::from(""))).await;
assert!(result.is_ok());
let list_result = store.list(None).collect::<Vec<_>>().await;
assert!(
list_result.is_empty(),
"Expected all objects to be deleted with empty prefix"
);
}
#[tokio::test]
async fn test_delete_objects_with_root_prefix() {
let store = Arc::new(InMemory::new());
store
.put(&Path::from("test/prefix/object1"), vec![1, 2, 3].into())
.await
.unwrap();
store
.put(&Path::from("other/path/object2"), vec![4, 5, 6].into())
.await
.unwrap();
store
.put(&Path::from("root_object"), vec![7, 8, 9].into())
.await
.unwrap();
let result = delete_objects_with_prefix(store.clone(), Some(&Path::from("/"))).await;
assert!(result.is_ok());
let list_result = store.list(None).collect::<Vec<_>>().await;
assert!(
list_result.is_empty(),
"Expected all objects to be deleted with '/' prefix"
);
}
#[tokio::test]
async fn test_delete_objects_with_none_prefix() {
let store = Arc::new(InMemory::new());
store
.put(&Path::from("test/prefix/object1"), vec![1, 2, 3].into())
.await
.unwrap();
store
.put(&Path::from("other/path/object2"), vec![4, 5, 6].into())
.await
.unwrap();
store
.put(&Path::from("root_object"), vec![7, 8, 9].into())
.await
.unwrap();
let result = delete_objects_with_prefix(store.clone(), None).await;
assert!(result.is_ok());
let list_result = store.list(None).collect::<Vec<_>>().await;
assert!(
list_result.is_empty(),
"Expected all objects to be deleted with None prefix"
);
}
}