use std::io::{BufWriter, Write};
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
mod ftp_server;
mod sftp_server;
use crate::FileStore;
use crate::tests::ftp_server::{random_tls_certificate, start_temp_ftp_server};
use crate::tests::sftp_server::start_temp_sftp_server;
const TEMP_BODY_A: &[u8] = b"temporary file string";
fn init() {
let _ = env_logger::builder()
.filter_module("suppaftp", log::LevelFilter::Warn)
.filter_module("libunftp", log::LevelFilter::Warn)
.filter_level(log::LevelFilter::Debug)
.is_test(true).try_init();
}
#[tokio::test]
async fn test_azure() {
init();
let url = "azure://alpytest.blob.core.windows.net/pytest/";
let fs = FileStore::with_limit_retries(url).await.unwrap();
println!("{fs:?}");
assert!(fs.get("__missing_file__").await.unwrap().is_none());
assert!(fs.exists("test").await.unwrap());
assert!(fs.get("test").await.unwrap().is_some());
assert!(fs.put("bob", &Bytes::copy_from_slice(b"bob")).await.is_err());
}
#[tokio::test(flavor = "multi_thread")]
async fn test_azure_emulator() {
init();
let url = "azure://localhost/?emulator=true&allow_directory_access=true";
let fs = FileStore::with_limit_retries(url).await.unwrap();
println!("{fs:?}");
common_actions(fs.clone()).await;
big_file(fs).await;
read_only(url.to_string()).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_sftp() {
init();
let server = start_temp_sftp_server().await;
let fs = FileStore::with_limit_retries(&server).await.unwrap();
common_actions(fs).await;
read_only(server).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn ftp() {
init();
let temp_ftp_server = start_temp_ftp_server(None).await;
let url = format!("ftp://{temp_ftp_server}");
let fs = FileStore::with_limit_retries(&url).await.unwrap();
common_actions(fs.clone()).await;
big_file(fs).await;
read_only(url).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn ftps() {
init();
let certs = random_tls_certificate().unwrap();
let temp_ftps_server = start_temp_ftp_server(Some(certs)).await;
let url = format!("ftps://{temp_ftps_server}");
let fs = FileStore::with_limit_retries(&url).await.unwrap();
common_actions(fs.clone()).await;
big_file(fs).await;
read_only(url).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn ftp_parallel() {
init();
let temp_ftp_server = start_temp_ftp_server(None).await;
let fs = FileStore::open(&[format!("ftp://{temp_ftp_server}")]).await.unwrap();
parallel_activity(fs).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn ftps_parallel() {
init();
let certs = random_tls_certificate().unwrap();
let temp_ftps_server = start_temp_ftp_server(Some(certs)).await;
let fs = FileStore::open(&[format!("ftps://{temp_ftps_server}")]).await.unwrap();
parallel_activity(fs).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_file() {
let directory = tempfile::tempdir().unwrap();
let url = format!("file://{}", directory.path().to_string_lossy());
println!("{url}");
let fs = FileStore::with_limit_retries(&url).await.unwrap();
common_actions(fs.clone()).await;
big_file(fs).await;
read_only(url).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_s3() {
let content = Bytes::copy_from_slice(b"THIS IS A MINIO TEST");
let url = "s3://al_storage_key:Ch@ngeTh!sPa33w0rd@localhost:9000/?s3_bucket=test&use_ssl=False";
let fs = FileStore::with_limit_retries(url).await.unwrap();
assert!(fs.delete("al4_minio_pytest.txt").await.is_ok());
assert!(fs.put("al4_minio_pytest.txt", &content).await.is_ok());
assert!(fs.exists("al4_minio_pytest.txt").await.unwrap());
assert_eq!(fs.get("al4_minio_pytest.txt").await.unwrap().unwrap(), content);
assert!(fs.delete("al4_minio_pytest.txt").await.is_ok());
common_actions(fs.clone()).await;
big_file(fs).await;
read_only(url.to_string()).await;
}
#[tokio::test(flavor = "multi_thread")]
async fn test_s3_retry() {
assert!(FileStore::with_limit_retries("s3://al_storage_key:Ch@ngeTh!sPa33w0rd@localhost:9111/?s3_bucket=test&use_ssl=False").await.is_err());
let timeout = tokio::time::timeout(
Duration::from_secs(5),
FileStore::open(&["s3://al_storage_key:Ch@ngeTh!sPa33w0rd@localhost:9111/?s3_bucket=test&use_ssl=False".to_string()])
).await;
assert!(timeout.is_err());
}
async fn common_actions(fs: Arc<FileStore>) {
let temp_dir = tempfile::tempdir().unwrap();
assert!(!fs.exists("__missing_file__").await.unwrap());
assert!(fs.get("__missing_file__").await.unwrap().is_none());
assert!(fs.download("__missing_file__", &temp_dir.path().join("local_copy")).await.is_err());
assert!(fs.stream("__missing_file__").await.is_err());
assert!(fs.upload(&temp_dir.path().join("__missing_file__"), "not-to-be-created").await.is_err());
assert!(!fs.exists("not-to-be-created").await.unwrap());
fs.delete("__missing_file__").await.unwrap();
let temp_body_a = Bytes::copy_from_slice(TEMP_BODY_A);
assert!(fs.put("put", &temp_body_a).await.is_ok());
assert_eq!(fs.get("put").await.unwrap().unwrap(), TEMP_BODY_A);
{
let temp_file_a = temp_dir.path().join("a");
tokio::fs::write(&temp_file_a, TEMP_BODY_A).await.unwrap();
let temp_file_b = temp_dir.path().join("b");
tokio::fs::write(&temp_file_b, TEMP_BODY_A).await.unwrap();
let failures = fs.upload_batch(&[
(&temp_file_a, "upload/a"),
(&temp_file_b, "upload/b")
]).await;
assert!(failures.is_empty(), "{failures:?}");
assert!(fs.exists("upload/a").await.unwrap());
assert!(fs.exists("upload/b").await.unwrap());
let temp_file_name = temp_dir.path().join("scratch");
assert!(!temp_file_name.exists());
fs.download("upload/b", &temp_file_name).await.unwrap();
assert_eq!(tokio::fs::read(temp_file_name).await.unwrap(), TEMP_BODY_A);
}
assert!(fs.exists("put").await.unwrap());
fs.delete("put").await.unwrap();
fs.delete("put").await.unwrap();
assert!(!fs.exists("put").await.unwrap());
fs.put("../illigal-file", &temp_body_a).await.unwrap();
assert!(fs.exists("illigal-file").await.unwrap());
}
async fn big_file(fs: Arc<FileStore>) {
let big_file = tokio::task::spawn_blocking(|| {
let big_file = tempfile::NamedTempFile::new().unwrap();
let mut writer = BufWriter::new(big_file);
for _ in 0..10_000 {
for ii in 0u64..2000 {
writer.write_all(&ii.to_le_bytes()).unwrap();
}
}
writer.into_inner().unwrap()
}).await.unwrap();
fs.upload(big_file.path(), "big_file").await.unwrap();
let big_copy = tempfile::NamedTempFile::new().unwrap();
fs.download("big_file", big_copy.path()).await.unwrap();
let data = tokio::fs::read(big_file.path()).await.unwrap();
assert!(data.len() > 1 << 25);
assert_eq!(tokio::fs::read(big_copy.path()).await.unwrap(), data);
}
async fn parallel_activity(fs: Arc<FileStore>) {
let mut file_names = vec![];
for _ in 0..1000 {
file_names.push(uuid::Uuid::new_v4().to_string());
}
let file_names = Arc::new(file_names);
let mut handles = vec![];
for _ in 0..10 {
let file_names = file_names.clone();
let fs = fs.clone();
handles.push(tokio::spawn(async move {
for _ in 0..1_000 {
let index = rand::random_range(0..file_names.len());
fs.put(&file_names[index], &Bytes::copy_from_slice(file_names[index].as_bytes())).await.unwrap()
}
}))
}
for _ in 0..10 {
let file_names = file_names.clone();
let fs = fs.clone();
handles.push(tokio::spawn(async move {
'outer: for _ in 0..1_000 {
let index = rand::random_range(0..file_names.len());
for _ in 0..10 {
if let Some(data) = fs.get(&file_names[index]).await.unwrap() {
if data == Bytes::copy_from_slice(file_names[index].as_bytes()) {
continue 'outer
}
} else {
continue 'outer
}
}
panic!()
}
}))
}
for handle in handles {
handle.await.unwrap();
println!("finish");
}
}
async fn read_only(url: String) {
use url::Url;
let fs = FileStore::with_limit_retries(&url).await.unwrap();
assert!(fs.put("test", &Bytes::copy_from_slice(b"test")).await.is_ok());
let mut ro_url = Url::parse(&url).unwrap();
ro_url.query_pairs_mut().append_pair("read_only", "true");
let fs_ro = FileStore::with_limit_retries(&ro_url.as_str()).await.unwrap();
assert!(fs.get("__missing_file__").await.unwrap().is_none());
assert!(fs_ro.exists("test").await.unwrap());
assert!(fs_ro.get("test").await.unwrap().is_some());
assert!(fs_ro.put("bob", &Bytes::copy_from_slice(b"bob")).await.is_ok());
assert!(!fs_ro.exists("bob").await.unwrap());
assert!(fs_ro.delete("test").await.is_ok());
assert!(fs_ro.exists("test").await.unwrap());
}