use std::{
collections::BTreeMap,
ffi::OsStr,
str::FromStr,
sync::{Arc, OnceLock},
vec::IntoIter,
};
use bytes::Bytes;
use bytesize::ByteSize;
use log::{error, trace, warn};
use opendal::{
Entry, Metadata,
blocking::{Operator, StdReader},
layers::{ConcurrentLimitLayer, LoggingLayer, RetryLayer, ThrottleLayer},
options::{ListOptions, ReadOptions},
};
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
use tokio::runtime::Runtime;
use typed_path::UnixPathBuf;
use rustic_core::{
ALL_FILE_TYPES, ErrorKind, FileType, Id, ReadBackend, ReadSource, ReadSourceEntry,
ReadSourceOpen, RusticError, RusticResult, WriteBackend,
repofile::{Node, NodeType},
};
mod constants {
pub(super) const DEFAULT_RETRY: usize = 5;
}
#[derive(Clone, Debug)]
pub struct OpenDALBackend {
operator: Operator,
}
fn runtime() -> &'static Runtime {
static RUNTIME: OnceLock<Runtime> = OnceLock::new();
RUNTIME.get_or_init(|| {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Throttle {
bandwidth: u32,
burst: u32,
}
impl FromStr for Throttle {
type Err = Box<RusticError>;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut values = s
.split(',')
.map(|s| {
ByteSize::from_str(s.trim()).map_err(|err| {
RusticError::with_source(
ErrorKind::InvalidInput,
"Parsing ByteSize from throttle string `{string}` failed",
err,
)
.attach_context("string", s)
})
})
.map(|b| -> RusticResult<u32> {
let bytesize = b?.as_u64();
bytesize.try_into().map_err(|err| {
RusticError::with_source(
ErrorKind::Internal,
"Converting ByteSize `{bytesize}` to u32 failed",
err,
)
.attach_context("bytesize", bytesize.to_string())
})
});
let bandwidth = values
.next()
.transpose()?
.ok_or_else(|| RusticError::new(ErrorKind::MissingInput, "No bandwidth given."))?;
let burst = values
.next()
.transpose()?
.ok_or_else(|| RusticError::new(ErrorKind::MissingInput, "No burst given."))?;
Ok(Self { bandwidth, burst })
}
}
impl OpenDALBackend {
pub fn new(path: impl AsRef<str>, options: BTreeMap<String, String>) -> RusticResult<Self> {
let max_retries = match options.get("retry").map(String::as_str) {
Some("false" | "off") => 0,
None | Some("default") => constants::DEFAULT_RETRY,
Some(value) => usize::from_str(value).map_err(|err| {
RusticError::with_source(
ErrorKind::InvalidInput,
"Parsing retry value `{value}` failed, the value must be a valid integer.",
err,
)
.attach_context("value", value.to_string())
})?,
};
let connections = options
.get("connections")
.map(|c| {
usize::from_str(c).map_err(|err| {
RusticError::with_source(
ErrorKind::InvalidInput,
"Parsing connections value `{value}` failed, the value must be a valid integer.",
err,
)
.attach_context("value", c)
})
})
.transpose()?;
let throttle = options
.get("throttle")
.map(|t| Throttle::from_str(t))
.transpose()?;
let scheme = path
.as_ref()
.split(':')
.next()
.unwrap_or_else(|| path.as_ref());
let mut operator = opendal::Operator::via_iter(scheme, options)
.map_err(|err| {
RusticError::with_source(
ErrorKind::Backend,
"Creating Operator from path `{path}` failed. Please check the given schema and options.",
err,
)
.attach_context("path", path.as_ref().to_string())
.attach_context("schema", scheme.to_string())
})?
.layer(RetryLayer::new().with_max_times(max_retries).with_jitter());
if let Some(Throttle { bandwidth, burst }) = throttle {
operator = operator.layer(ThrottleLayer::new(bandwidth, burst));
}
if let Some(connections) = connections {
operator = operator.layer(ConcurrentLimitLayer::new(connections));
}
let _guard = runtime().enter();
let operator = Operator::new(operator.layer(LoggingLayer::default())).map_err(|err| {
RusticError::with_source(
ErrorKind::Backend,
"Creating blocking Operator from path `{path}` failed.",
err,
)
.attach_context("path", path.as_ref().to_string())
})?;
Ok(Self { operator })
}
#[allow(clippy::unused_self)]
fn path(&self, tpe: FileType, id: &Id) -> String {
let hex_id = id.to_hex();
match tpe {
FileType::Config => UnixPathBuf::from("config"),
FileType::Pack => UnixPathBuf::from("data")
.join(&hex_id[0..2])
.join(&hex_id[..]),
_ => UnixPathBuf::from(tpe.dirname()).join(&hex_id[..]),
}
.to_string()
}
pub fn as_source(self) -> RusticResult<OpenDALReadSource> {
let list_options = ListOptions {
recursive: true,
..Default::default()
};
let mut entries: Vec<_> = self
.operator
.lister_options("", list_options)
.map_err(|err| {
RusticError::with_source(ErrorKind::Backend, "Error listong openDAL source.", err)
})?
.filter_map(|entry| {
if let Ok(e) = &entry
&& e.path() == "/"
{
return None;
}
entry
.inspect_err(|err| warn!("ignoring error on openDAL entry: {err}"))
.ok()
})
.collect();
entries.sort_unstable_by(|e1, e2| e1.path().cmp(e2.path()));
Ok(OpenDALReadSource {
entries,
be: Arc::new(self),
})
}
}
impl ReadBackend for OpenDALBackend {
fn location(&self) -> String {
let info = self.operator.info();
format!("opendal:{}:{}", info.scheme(), info.name())
}
fn list(&self, tpe: FileType) -> RusticResult<Vec<Id>> {
trace!("listing tpe: {tpe:?}");
if tpe == FileType::Config {
return Ok(
if self.operator.exists("config").map_err(|err| {
RusticError::with_source(
ErrorKind::Backend,
"Path `config` does not exist.",
err,
)
.ask_report()
})? {
vec![Id::default()]
} else {
Vec::new()
},
);
}
let path = tpe.dirname().to_string() + "/";
let list_options = ListOptions {
recursive: true,
..Default::default()
};
let lister = self
.operator
.lister_options(&path, list_options)
.map_err(|err| {
RusticError::with_source(ErrorKind::Backend, "Listing failed for `{type}`", err)
.attach_context("type", tpe.to_string())
})?;
Ok(lister
.filter_map(|r| {
let entry = r
.inspect_err(|err| error!("error listing {tpe}: {err}"))
.ok()?;
let metadata = entry.metadata();
if !metadata.is_file() {
return None;
}
Id::parse_some(entry.name(), tpe)
})
.collect())
}
fn list_with_size(&self, tpe: FileType) -> RusticResult<Vec<(Id, u32)>> {
fn length(entry: &Metadata, file_name: &str, tpe: FileType) -> Option<u32> {
let length = entry.content_length();
length.try_into().inspect_err(|err| {
error!("Failed to convert file length {length} of {file_name} to u32 while listing {tpe}: {err}");
}).ok()
}
trace!("listing tpe: {tpe:?}");
if tpe == FileType::Config {
return match self.operator.stat("config") {
Ok(meta) => Ok(vec![(Id::default(), length(&meta, "config", tpe).unwrap_or_default())]),
Err(err) if err.kind() == opendal::ErrorKind::NotFound => Ok(Vec::new()),
Err(err) => Err(err).map_err(|err|
RusticError::with_source(
ErrorKind::Backend,
"Getting Metadata of type `{type}` failed in the backend. Please check if `{type}` exists.",
err,
)
.attach_context("type", tpe.to_string())
),
};
}
let path = tpe.dirname().to_string() + "/";
let list_options = ListOptions {
recursive: true,
..Default::default()
};
let lister = self
.operator
.lister_options(&path, list_options)
.map_err(|err| {
RusticError::with_source(ErrorKind::Backend, "Listing failed for `{type}`", err)
.attach_context("type", tpe.to_string())
})?;
let entries = lister
.filter_map(|r| {
let entry = r
.inspect_err(|err| error!("error listing {tpe}: {err}"))
.ok()?;
let metadata = entry.metadata();
if !metadata.is_file() {
return None;
}
let name = entry.name();
let id = Id::parse_some(name, tpe)?;
let length = length(metadata, name, tpe)?;
Some((id, length))
})
.collect();
Ok(entries)
}
fn read_full(&self, tpe: FileType, id: &Id) -> RusticResult<Bytes> {
trace!("reading tpe: {tpe:?}, id: {id}");
let path = self.path(tpe, id);
Ok(self
.operator
.read(&path)
.map_err(|err|
RusticError::with_source(
ErrorKind::Backend,
"Reading file `{path}` failed in the backend. Please check if the given path is correct.",
err,
)
.attach_context("path", path)
.attach_context("type", tpe.to_string())
.attach_context("id", id.to_string())
)?
.to_bytes())
}
fn read_partial(
&self,
tpe: FileType,
id: &Id,
_cacheable: bool,
offset: u32,
length: u32,
) -> RusticResult<Bytes> {
trace!("reading tpe: {tpe:?}, id: {id}, offset: {offset}, length: {length}");
let range = u64::from(offset)..u64::from(offset + length);
let path = self.path(tpe, id);
let read_options = ReadOptions {
range: range.into(),
..Default::default()
};
Ok(self
.operator
.read_options(&path, read_options)
.map_err(|err|
RusticError::with_source(
ErrorKind::Backend,
"Partially reading file `{path}` failed in the backend. Please check if the given path is correct.",
err,
)
.attach_context("path", path)
.attach_context("type", tpe.to_string())
.attach_context("id", id.to_string())
.attach_context("offset", offset.to_string())
.attach_context("length", length.to_string())
)?
.to_bytes())
}
fn warmup_path(&self, tpe: FileType, id: &Id) -> String {
let root = self.operator.info().root();
let root = root.trim_matches('/');
let relative_path = self.path(tpe, id);
if root.is_empty() {
relative_path
} else {
format!("{root}/{relative_path}")
}
}
}
impl WriteBackend for OpenDALBackend {
fn create(&self) -> RusticResult<()> {
trace!("creating repo at {:?}", self.location());
for tpe in ALL_FILE_TYPES {
let path = tpe.dirname().to_string() + "/";
self.operator
.create_dir(&path)
.map_err(|err|
RusticError::with_source(
ErrorKind::Backend,
"Creating directory `{path}` failed in the backend `{location}`. Please check if the given path is correct.",
err,
)
.attach_context("path", path)
.attach_context("location", self.location())
.attach_context("type", tpe.to_string())
)?;
}
(0u8..=255)
.into_par_iter()
.try_for_each(|i| {
let path = UnixPathBuf::from("data")
.join(hex::encode([i]))
.to_string_lossy()
.to_string()
+ "/";
self.operator.create_dir(&path).map_err(|err|
RusticError::with_source(
ErrorKind::Backend,
"Creating directory `{path}` failed in the backend `{location}`. Please check if the given path is correct.",
err,
)
.attach_context("path", path)
.attach_context("location", self.location())
)
})?;
Ok(())
}
fn write_bytes(
&self,
tpe: FileType,
id: &Id,
_cacheable: bool,
buf: Bytes,
) -> RusticResult<()> {
trace!("writing tpe: {:?}, id: {}", &tpe, &id);
let filename = self.path(tpe, id);
_ = self.operator.write(&filename, buf).map_err(|err| {
RusticError::with_source(
ErrorKind::Backend,
"Writing file `{path}` failed in the backend. Please check if the given path is correct.",
err,
)
.attach_context("path", filename)
.attach_context("type", tpe.to_string())
.attach_context("id", id.to_string())
})?;
Ok(())
}
fn remove(&self, tpe: FileType, id: &Id, _cacheable: bool) -> RusticResult<()> {
trace!("removing tpe: {:?}, id: {}", &tpe, &id);
let filename = self.path(tpe, id);
self.operator.delete(&filename).map_err(|err| {
RusticError::with_source(
ErrorKind::Backend,
"Deleting file `{path}` failed in the backend. Please check if the given path is correct.",
err,
)
.attach_context("path", filename)
.attach_context("type", tpe.to_string())
.attach_context("id", id.to_string())
})?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
use rstest::rstest;
use serde::Deserialize;
use std::{fs, path::PathBuf};
#[rstest]
#[case("10kB,10MB", Throttle{bandwidth:10_000, burst:10_000_000})]
#[case("10 kB,10 MB", Throttle{bandwidth:10_000, burst:10_000_000})]
#[case("10kB, 10MB", Throttle{bandwidth:10_000, burst:10_000_000})]
#[case(" 10kB, 10MB", Throttle{bandwidth:10_000, burst:10_000_000})]
#[case("10kiB,10MiB", Throttle{bandwidth:10_240, burst:10_485_760})]
fn correct_throttle(#[case] input: &str, #[case] expected: Throttle) {
assert_eq!(Throttle::from_str(input).unwrap(), expected);
}
#[rstest]
#[case("")]
#[case("10kiB")]
#[case("no_number,10MiB")]
#[case("10kB;10MB")]
fn invalid_throttle(#[case] input: &str) {
assert!(Throttle::from_str(input).is_err());
}
#[rstest]
fn new_opendal_backend(
#[files("tests/fixtures/opendal/*.toml")] test_case: PathBuf,
) -> Result<()> {
#[derive(Deserialize)]
struct TestCase {
path: String,
options: BTreeMap<String, String>,
}
let test: TestCase = toml::from_str(&fs::read_to_string(test_case)?)?;
_ = OpenDALBackend::new(test.path, test.options)?;
Ok(())
}
#[rstest]
#[case("s3_aws", "path/to/repo/data/")] #[case("s3_idrive", "data/")] fn test_warmup_path_respects_root(
#[case] fixture: &str,
#[case] expected_prefix: &str,
) -> Result<()> {
#[derive(Deserialize)]
struct TestCase {
path: String,
options: BTreeMap<String, String>,
}
let fixture_path = PathBuf::from(format!("tests/fixtures/opendal/{fixture}.toml"));
let test: TestCase = toml::from_str(&fs::read_to_string(fixture_path)?)?;
let backend = OpenDALBackend::new(test.path, test.options)?;
let id: Id = "03dc1178e4e54f69beaf35dd9d4256a5a600e9fa3452b9db80bd649938923e67".parse()?;
let path = backend.warmup_path(FileType::Pack, &id);
assert!(
path.starts_with(expected_prefix),
"warmup_path should start with '{expected_prefix}', got: {path}"
);
assert!(
!path.contains("//"),
"warmup_path should not contain double slashes: {path}"
);
Ok(())
}
}
#[derive(Debug)]
pub struct OpenFile(Arc<OpenDALBackend>, String);
impl ReadSourceOpen for OpenFile {
type Reader = StdReader;
fn open(self) -> RusticResult<Self::Reader> {
let path = self.1;
let reader = || self.0.operator.reader(&path)?.into_std_read(..);
let reader = reader()
.map_err(|err| {
RusticError::with_source(
ErrorKind::InputOutput,
"Failed to open file at `{path}`. Please make sure the file exists and is accessible.",
err,
)
.attach_context("path", path)
})?;
Ok(reader)
}
}
#[allow(missing_debug_implementations)]
pub struct OpenDALLister(IntoIter<Entry>, Arc<OpenDALBackend>);
impl Iterator for OpenDALLister {
type Item = RusticResult<ReadSourceEntry<OpenFile>>;
fn next(&mut self) -> Option<Self::Item> {
Ok(self.0.next().map(|e| {
let path = e.path();
let path = path.strip_suffix('/').unwrap_or(path);
let name = OsStr::new(e.name());
let metadata = e.metadata();
let node_type = if metadata.is_dir() {
NodeType::Dir
} else {
NodeType::File
};
let meta = rustic_core::repofile::Metadata {
mtime: metadata
.last_modified()
.map(opendal::raw::Timestamp::into_inner),
size: metadata.content_length(),
..Default::default()
};
let node = Node::new_node(name, node_type, meta);
let open = Some(OpenFile(self.1.clone(), path.to_string()));
ReadSourceEntry {
path: path.into(),
node,
open,
}
}))
.transpose()
}
}
#[allow(missing_debug_implementations)]
pub struct OpenDALReadSource {
entries: Vec<Entry>,
be: Arc<OpenDALBackend>,
}
impl ReadSource for OpenDALReadSource {
type Open = OpenFile;
type Iter = OpenDALLister;
fn size(&self) -> RusticResult<Option<u64>> {
let size = self
.entries
.iter()
.map(|e| e.metadata().content_length())
.sum();
Ok(Some(size))
}
fn entries(&self) -> Self::Iter {
OpenDALLister(self.entries.clone().into_iter(), self.be.clone())
}
}