use crate::args::{parse_args, CliArgs, CliCommands, GcResource, GcSchedule};
use object_store::path::Path;
use object_store::ObjectStore;
use slatedb::admin;
use slatedb::admin::{list_checkpoints, list_manifests, read_manifest, run_gc_instance};
use slatedb::config::GcExecutionMode::{Once, Periodic};
use slatedb::config::{
CheckpointOptions, GarbageCollectorDirectoryOptions, GarbageCollectorOptions,
};
use slatedb::Db;
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
mod args;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
tracing_subscriber::fmt::init();
let args: CliArgs = parse_args();
let path = Path::from(args.path.as_str());
let object_store = admin::load_object_store_from_env(args.env_file)?;
match args.command {
CliCommands::ReadManifest { id } => exec_read_manifest(&path, object_store, id).await?,
CliCommands::ListManifests { start, end } => {
exec_list_manifest(&path, object_store, start, end).await?
}
CliCommands::CreateCheckpoint { lifetime, source } => {
exec_create_checkpoint(&path, object_store, lifetime, source).await?
}
CliCommands::RefreshCheckpoint { id, lifetime } => {
exec_refresh_checkpoint(&path, object_store, id, lifetime).await?
}
CliCommands::DeleteCheckpoint { id } => {
exec_delete_checkpoint(&path, object_store, id).await?
}
CliCommands::ListCheckpoints {} => exec_list_checkpoints(&path, object_store).await?,
CliCommands::RunGarbageCollection { resource, min_age } => {
exec_gc_once(&path, object_store, resource, min_age).await?
}
CliCommands::ScheduleGarbageCollection {
manifest,
wal,
compacted,
} => schedule_gc(&path, object_store, manifest, wal, compacted).await?,
}
Ok(())
}
async fn exec_read_manifest(
path: &Path,
object_store: Arc<dyn ObjectStore>,
id: Option<u64>,
) -> Result<(), Box<dyn Error>> {
match read_manifest(path, object_store, id).await? {
None => {
println!("No manifest file found.")
}
Some(manifest) => {
println!("{}", manifest);
}
}
Ok(())
}
async fn exec_list_manifest(
path: &Path,
object_store: Arc<dyn ObjectStore>,
start: Option<u64>,
end: Option<u64>,
) -> Result<(), Box<dyn Error>> {
let range = match (start, end) {
(Some(s), Some(e)) => s..e,
(Some(s), None) => s..u64::MAX,
(None, Some(e)) => u64::MIN..e,
_ => u64::MIN..u64::MAX,
};
println!("{}", list_manifests(path, object_store, range).await?);
Ok(())
}
async fn exec_create_checkpoint(
path: &Path,
object_store: Arc<dyn ObjectStore>,
lifetime: Option<Duration>,
source: Option<Uuid>,
) -> Result<(), Box<dyn Error>> {
let result = admin::create_checkpoint(
path.clone(),
object_store,
&CheckpointOptions { lifetime, source },
)
.await?;
println!("{:?}", result);
Ok(())
}
async fn exec_refresh_checkpoint(
path: &Path,
object_store: Arc<dyn ObjectStore>,
id: Uuid,
lifetime: Option<Duration>,
) -> Result<(), Box<dyn Error>> {
println!(
"{:?}",
Db::refresh_checkpoint(path, object_store, id, lifetime).await?
);
Ok(())
}
async fn exec_delete_checkpoint(
path: &Path,
object_store: Arc<dyn ObjectStore>,
id: Uuid,
) -> Result<(), Box<dyn Error>> {
println!("{:?}", Db::delete_checkpoint(path, object_store, id).await?);
Ok(())
}
async fn exec_list_checkpoints(
path: &Path,
object_store: Arc<dyn ObjectStore>,
) -> Result<(), Box<dyn Error>> {
let checkpoint = list_checkpoints(path, object_store).await?;
let checkpoint_json = serde_json::to_string(&checkpoint)?;
println!("{}", checkpoint_json);
Ok(())
}
async fn exec_gc_once(
path: &Path,
object_store: Arc<dyn ObjectStore>,
resource: GcResource,
min_age: Duration,
) -> Result<(), Box<dyn Error>> {
fn create_gc_dir_opts(min_age: Duration) -> Option<GarbageCollectorDirectoryOptions> {
Some(GarbageCollectorDirectoryOptions {
execution_mode: Once,
min_age,
})
}
let gc_opts = match resource {
GcResource::Manifest => GarbageCollectorOptions {
manifest_options: create_gc_dir_opts(min_age),
wal_options: None,
compacted_options: None,
..GarbageCollectorOptions::default()
},
GcResource::Wal => GarbageCollectorOptions {
manifest_options: None,
wal_options: create_gc_dir_opts(min_age),
compacted_options: None,
..GarbageCollectorOptions::default()
},
GcResource::Compacted => GarbageCollectorOptions {
manifest_options: None,
wal_options: None,
compacted_options: create_gc_dir_opts(min_age),
..GarbageCollectorOptions::default()
},
};
run_gc_instance(path, object_store, gc_opts).await?;
Ok(())
}
async fn schedule_gc(
path: &Path,
object_store: Arc<dyn ObjectStore>,
manifest_schedule: Option<GcSchedule>,
wal_schedule: Option<GcSchedule>,
compacted_schedule: Option<GcSchedule>,
) -> Result<(), Box<dyn Error>> {
fn create_gc_dir_opts(schedule: GcSchedule) -> Option<GarbageCollectorDirectoryOptions> {
Some(GarbageCollectorDirectoryOptions {
execution_mode: Periodic(schedule.period),
min_age: schedule.min_age,
})
}
let gc_opts = GarbageCollectorOptions {
manifest_options: manifest_schedule.and_then(create_gc_dir_opts),
wal_options: wal_schedule.and_then(create_gc_dir_opts),
compacted_options: compacted_schedule.and_then(create_gc_dir_opts),
..GarbageCollectorOptions::default()
};
run_gc_instance(path, object_store, gc_opts).await?;
Ok(())
}