use std::path::{Path, PathBuf};
use std::sync::Arc;
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use log::{info, warn};
use crate::transport::Transport;
use crate::transport::ftp::TransportFtp;
use crate::transport::local::LocalTransport;
use crate::transport::sftp::{SftpParameters, TransportSftp};
#[derive(Debug)]
pub struct FileStore {
transports: Vec<Box<dyn Transport>>,
}
impl FileStore {
pub async fn open(urls: &[String]) -> Result<Arc<FileStore>> {
let mut transports = vec![];
for url in urls {
transports.push(Self::create_transport(url, None).await?)
}
info!("FileStore initialized with {} writable transport(s)", transports.len());
Ok(Arc::new(Self {
transports
}))
}
pub async fn with_limit_retries(url: &str) -> Result<Arc<FileStore>> {
Ok(Arc::new(Self {
transports: vec![Self::create_transport(url, Some(1)).await?],
}))
}
async fn create_transport(address: &str, connection_attempts: Option<usize>) -> Result<Box<dyn Transport>> {
let url: url::Url = address.parse()?;
let mut base = match url.path() {
"" => "/",
other => other,
}.to_string();
let host = url.host_str();
if host == Some(".") {
base = format!(".{base}");
}
let port = url.port();
let password = url.password().map(|password|{
percent_encoding::percent_decode_str(password).decode_utf8_lossy().to_string()
});
let read_only = url.query_pairs().find_map(|(name, value)| {
if name == "read_only" {
Some(read_bool(&value))
} else {
None
}
}).unwrap_or(false);
match url.scheme() {
"file" => {
if url.has_host() {
bail!("Local file connections can't specify a host.");
}
let path = base.parse()?;
Ok(Box::new(LocalTransport::new(path, read_only)))
}
"azure" => {
use crate::transport::azure::{AzureParameters, TransportAzure};
let mut parameters = AzureParameters::default();
for (name, value) in url.query_pairs() {
match name.as_ref() {
"allow_directory_access" => parameters.allow_directory_access = read_bool(&value),
"use_default_credentials" => parameters.use_default_credentials = read_bool(&value),
"emulator" => parameters.emulator = read_bool(&value),
"access_key" => parameters.access_key = value.to_string(),
"tenant_id" => parameters.tenant_id = value.to_string(),
"client_id" => parameters.client_id = value.to_string(),
"client_secret" => parameters.client_secret = value.to_string(),
_ => {}
}
}
let host = match url.host_str() {
Some(host) => host.to_owned(),
None => bail!("a host must be provided for azure connections"),
};
let base = url.path().to_owned();
Ok(Box::new(TransportAzure::new(host, base, parameters, connection_attempts, read_only).await?))
},
"s3" => {
use crate::transport::s3::{S3Parameters, TransportS3};
let mut parameters = S3Parameters::default();
for (name, value) in url.query_pairs() {
match name.as_ref() {
"use_ssl" => parameters.use_ssl = read_bool(&value),
"verify" => parameters.verify = read_bool(&value),
"boto_defaults" => parameters.boto_defaults = read_bool(&value),
"aws_region" => parameters.aws_region = Some(value.to_string()),
"s3_bucket" => parameters.s3_bucket = value.to_string(),
"compatability" => parameters.compatability = read_bool(&value),
_ => {}
}
}
let user = match url.username() {
"" => None,
value => Some(value.to_owned())
};
Ok(Box::new(TransportS3::new(base, host.map(str::to_string), port, user, password, connection_attempts, parameters, read_only).await?))
}
"ftp" | "ftps" => {
let host = match host {
Some(host) => host.to_owned(),
None => bail!("A host must be provided for ftp transport")
};
let user = match url.username() {
"" => None,
value => Some(value.to_owned())
};
if url.scheme().eq_ignore_ascii_case("ftps") {
Ok(Box::new(TransportFtp::new_secure(connection_attempts, &base, host, port.unwrap_or(21), user, password, read_only).await?))
} else {
Ok(Box::new(TransportFtp::new(connection_attempts, &base, host, port.unwrap_or(21), user, password, read_only).await?))
}
}
"sftp" => {
let host = match host {
Some(host) => host.to_owned(),
None => bail!("A host must be provided for ftp transport")
};
let user = url.username().to_owned();
let mut params = SftpParameters::default();
for (name, value) in url.query_pairs() {
match name.as_ref() {
"private_key" => params.private_key = Some(value.to_string()),
"private_key_password" => params.private_key_password = Some(value.to_string()),
"validate_host" => params.validate_host = read_bool(&value),
_ => {}
}
}
Ok(Box::new(TransportSftp::new(base, host, password, user, port.unwrap_or(22), connection_attempts, params, read_only).await?))
}
_ => {
bail!("Not an accepted filestore scheme: {}", url.scheme());
}
}
}
pub async fn put(&self, name: &str, body: &Bytes) -> Result<()> {
for transport in &self.transports {
if transport.read_only() {
continue;
}
transport.put(name, body).await?;
}
Ok(())
}
pub async fn exists(&self, name: &str) -> Result<bool> {
let mut last_error = None;
for transport in &self.transports {
match transport.exists(name).await {
Ok(true) => return Ok(true),
Ok(false) => continue,
Err(err) => {
last_error = Some(err);
continue
},
}
}
if let Some(error) = last_error {
return Err(error).context("Transport errors");
}
return Ok(false)
}
pub async fn get(&self, name: &str) -> Result<Option<Vec<u8>>> {
let mut last_error = None;
for transport in &self.transports {
match transport.get(name).await {
Ok(bytes) => {
return Ok(bytes)
},
Err(err) => {
warn!("error fetching blob [{name}] from transport {transport:?}: {err}");
last_error = Some(err);
continue
},
}
}
match last_error {
Some(error) => Err(error).context("All transports failed to fetch"),
None => Ok(None)
}
}
pub async fn download(&self, name: &str, path: &Path) -> Result<()> {
let mut errors = vec![];
for transport in &self.transports {
match transport.download(name, path).await {
Ok(()) => {
return Ok(())
},
Err(err) => {
errors.push(format!("Could not download file: [{name}] from {transport:?}: {err}"));
continue
},
}
}
if errors.is_empty() {
bail!("All transports failed to fetch [{name}]")
}
Err(anyhow::anyhow!(errors.join("\n")).context("All transports failed to fetch"))
}
pub async fn upload(&self, path: &Path, name: &str) -> Result<()> {
let mut last_error = None;
for transport in &self.transports {
if transport.read_only() {
continue;
}
if let Err(err) = transport.upload(path, name).await {
last_error = Some(err);
}
}
match last_error {
Some(error) => Err(error).context("A transport failed to upload"),
None => Ok(())
}
}
pub async fn upload_batch(&self, local_remote_tuples: &[(&Path, &str)]) -> Vec<(PathBuf, String, String)> {
let mut failed_tuples = vec![];
for (src_path, dst_path) in local_remote_tuples {
if let Err(error) = self.upload(src_path, dst_path).await {
failed_tuples.push((src_path.to_path_buf(), dst_path.to_string(), format!("{error:?}")));
}
}
return failed_tuples
}
pub async fn stream(&self, name: &str) -> Result<(u64, tokio::sync::mpsc::Receiver<Result<Bytes, std::io::Error>>)> {
let mut last_error = None;
for transport in &self.transports {
match transport.stream(name).await {
Ok((size, stream)) => return Ok((size, stream)),
Err(err) => last_error = Some(err),
}
}
match last_error {
Some(err) => Err(err),
None => bail!("No transports could stream file"),
}
}
pub async fn delete(&self, name: &str) -> Result<()> {
for transport in &self.transports {
if transport.read_only() {
continue;
}
transport.delete(name).await?;
}
Ok(())
}
}
fn read_bool(value: &str) -> bool {
matches!(value.to_ascii_lowercase().as_str(), "true" | "1")
}