use std::sync::Arc;
use opendal::Operator;
use tempfile::TempDir;
use uuid::Uuid;
pub struct OpendalTestOperators {
pub fs_operator: Operator,
#[allow(dead_code)]
fs_tmp_dir: Arc<TempDir>,
pub gcs_operator: Option<Operator>,
pub memory_operator: Operator,
}
impl OpendalTestOperators {
pub fn new() -> Self {
let (fs_operator, fs_tmp_dir) = get_fs_operator();
Self {
fs_operator,
fs_tmp_dir: Arc::new(fs_tmp_dir),
gcs_operator: get_gcs_operator(true),
memory_operator: get_memory_operator(),
}
}
pub fn operators(&self) -> Vec<(opendal::Scheme, Operator)> {
let mut operators = vec![
(self.fs_operator.info().scheme(), self.fs_operator.clone()),
(
self.memory_operator.info().scheme(),
self.memory_operator.clone(),
),
];
if let Some(gcs_operator) = &self.gcs_operator {
operators.push((gcs_operator.info().scheme(), gcs_operator.clone()));
}
operators
}
pub fn is_gcs_available(&self) -> bool {
self.gcs_operator.is_some()
}
}
impl Drop for OpendalTestOperators {
fn drop(&mut self) {
let gcs_operator = match &self.gcs_operator {
Some(operator) => operator,
None => return,
};
let test_root_dir = gcs_operator.info().root();
tokio::task::spawn_blocking(move || {
let base_gcs_operator =
get_gcs_operator(false).expect("GCS operator should be available");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
match base_gcs_operator.remove_all(&test_root_dir).await {
Ok(_) => {}
Err(e) => {
println!("Error deleting GCS root directory: {}", e);
}
}
});
});
}
}
pub(crate) fn get_fs_operator() -> (Operator, TempDir) {
let tmp_dir = tempfile::tempdir().unwrap();
let s = tmp_dir.path().to_str().unwrap();
let builder = opendal::services::Fs::default().root(s);
let operator = opendal::Operator::new(builder).unwrap().finish();
(operator, tmp_dir)
}
pub(crate) fn get_gcs_operator(test_root_dir: bool) -> Option<Operator> {
let credential_path = match std::env::var("GOOGLE_APPLICATION_CREDENTIALS") {
Ok(path) => path,
Err(_) => return None,
};
let bucket_name = match std::env::var("GCS_BUCKET") {
Ok(path) => path,
Err(_) => return None,
};
let mut builder = opendal::services::Gcs::default()
.bucket(&bucket_name)
.credential_path(&credential_path);
if test_root_dir {
builder = builder.root(&format!("test_{}", Uuid::new_v4()));
}
let operator = opendal::Operator::new(builder).unwrap().finish();
Some(operator)
}
pub(crate) fn get_memory_operator() -> Operator {
let builder = opendal::services::Memory::default();
opendal::Operator::new(builder).unwrap().finish()
}
#[cfg(test)]
mod tests {
use opendal::Buffer;
use super::*;
#[tokio::test]
async fn test_operator_test_providers() {
let providers = OpendalTestOperators::new();
let operators = providers.operators();
if providers.is_gcs_available() {
assert!(
operators.len() == 3,
"Expected 3 operators, got {}",
operators.len()
);
println!("GCS operator is available"); } else {
assert!(
operators.len() == 2,
"Expected 2 operators, got {}",
operators.len()
);
println!("GCS operator is NOT available"); }
}
#[tokio::test]
async fn test_gcs_cleanup() {
let test_root_dir = {
let operators = OpendalTestOperators::new();
if !operators.is_gcs_available() {
return;
}
let gcs_operator = operators.gcs_operator.as_ref().unwrap().clone();
gcs_operator
.write("test.txt", Buffer::from("test"))
.await
.unwrap();
gcs_operator.info().root()
};
tokio::time::sleep(std::time::Duration::from_secs(1)).await; let base_gcs_operator = get_gcs_operator(false).unwrap();
let exists = base_gcs_operator.exists(&test_root_dir).await.unwrap();
assert!(!exists, "Test root directory should not exist anymore as it should have been deleted by the Drop impl");
}
}