use std::marker::PhantomData;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicI64, Ordering};
use std::time::Duration;
use opendal::Operator;
use opendal::blocking;
use opendal::layers::RetryLayer;
use crate::config::DestinationConfig;
use crate::error::Result;
const ONESHOT_BUDGET_BYTES: i64 = 64 * 1024 * 1024;
static ONESHOT_BUDGET: AtomicI64 = AtomicI64::new(ONESHOT_BUDGET_BYTES);
struct OneShotReservation(i64);
impl Drop for OneShotReservation {
fn drop(&mut self) {
ONESHOT_BUDGET.fetch_add(self.0, Ordering::Relaxed);
}
}
fn reserve_oneshot(size: u64) -> Option<OneShotReservation> {
let size = i64::try_from(size).unwrap_or(i64::MAX);
take_from(&ONESHOT_BUDGET, size).then_some(OneShotReservation(size))
}
fn take_from(budget: &AtomicI64, size: i64) -> bool {
if budget.fetch_sub(size, Ordering::Relaxed) >= size {
true
} else {
budget.fetch_add(size, Ordering::Relaxed);
false
}
}
pub(crate) trait CloudBackend {
const RUNTIME_LABEL: &'static str;
const SCHEME: &'static str;
fn build_operator(config: &DestinationConfig) -> Result<Operator>;
}
pub(crate) struct CloudDestination<B: CloudBackend> {
_runtime: Arc<tokio::runtime::Runtime>,
op: blocking::Operator,
prefix: String,
_backend: PhantomData<fn() -> B>,
}
const DEFAULT_MAX_RETRIES: usize = 5;
impl<B: CloudBackend> CloudDestination<B> {
pub fn new(config: &DestinationConfig) -> Result<Self> {
Self::new_with_retries(config, DEFAULT_MAX_RETRIES)
}
pub fn new_with_retries(config: &DestinationConfig, max_times: usize) -> Result<Self> {
let runtime = Arc::new(
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.map_err(|e| {
anyhow::anyhow!(
"failed to create tokio runtime for {}: {}",
B::RUNTIME_LABEL,
e
)
})?,
);
let _guard = runtime.enter();
let async_op = B::build_operator(config)?.layer(
RetryLayer::new()
.with_max_times(max_times)
.with_min_delay(Duration::from_millis(200))
.with_max_delay(Duration::from_secs(10))
.with_jitter(),
);
let op = blocking::Operator::new(async_op)?;
let prefix = config.prefix.clone().unwrap_or_default();
Ok(Self {
_runtime: runtime,
op,
prefix,
_backend: PhantomData,
})
}
}
impl<B: CloudBackend> super::Destination for CloudDestination<B> {
fn write(&self, local_path: &Path, remote_key: &str) -> Result<super::WriteOutcome> {
let key = format!("{}{}", self.prefix, remote_key);
let size = std::fs::metadata(local_path)?.len();
let outcome = if let Some(_reservation) = reserve_oneshot(size) {
let body = std::fs::read(local_path)?;
let meta = self.op.write(&key, body)?;
super::WriteOutcome {
content_md5: meta
.content_md5()
.map(str::to_string)
.or_else(|| meta.etag().map(|e| e.trim_matches('"').to_string())),
}
} else {
let mut src = std::fs::File::open(local_path)?;
let mut dst = self.op.writer(&key)?.into_std_write();
std::io::copy(&mut src, &mut dst)?;
dst.close()?;
super::WriteOutcome::opaque()
};
log::info!("uploaded {}://{} ({size} bytes)", B::SCHEME, key);
Ok(outcome)
}
fn capabilities(&self) -> super::DestinationCapabilities {
super::DestinationCapabilities {
commit_protocol: super::WriteCommitProtocol::FinalizeOnClose,
idempotent_overwrite: true,
retry_safe: true,
partial_write_risk: false,
}
}
fn list_prefix(&self, prefix: &str) -> Result<Vec<super::ObjectMeta>> {
let full = format!("{}{}", self.prefix, prefix);
let listed = if full.is_empty() || full.ends_with('/') {
self.op.list_options(
&full,
opendal::options::ListOptions {
recursive: true,
..Default::default()
},
)?
} else {
self.op.list_options(
&format!("{}/", full),
opendal::options::ListOptions {
recursive: true,
..Default::default()
},
)?
};
let mut out = Vec::with_capacity(listed.len());
for entry in listed {
if entry.metadata().mode() != opendal::EntryMode::FILE {
continue;
}
let abs = entry.path().to_string();
let rel = abs
.strip_prefix(self.prefix.as_str())
.unwrap_or(abs.as_str())
.to_string();
out.push(super::ObjectMeta {
key: rel,
size_bytes: entry.metadata().content_length(),
content_md5: entry.metadata().content_md5().map(str::to_string),
});
}
Ok(out)
}
fn read(&self, key: &str) -> Result<Vec<u8>> {
let full = format!("{}{}", self.prefix, key);
let buf = self.op.read(&full)?;
Ok(buf.to_vec())
}
fn head(&self, key: &str) -> Result<Option<super::ObjectMeta>> {
let full = format!("{}{}", self.prefix, key);
match self.op.stat(&full) {
Ok(meta) => Ok(Some(super::ObjectMeta {
key: key.to_string(),
size_bytes: meta.content_length(),
content_md5: meta.content_md5().map(str::to_string),
})),
Err(e) if e.kind() == opendal::ErrorKind::NotFound => Ok(None),
Err(e) => Err(e.into()),
}
}
fn r#move(&self, from: &str, to: &str) -> Result<()> {
let from_full = format!("{}{}", self.prefix, from);
let to_full = format!("{}{}", self.prefix, to);
self.op.copy(&from_full, &to_full)?;
self.op.delete(&from_full)?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::{AtomicI64, CloudDestination, Ordering, take_from};
use crate::config::{DestinationConfig, DestinationType};
use crate::destination::gcs::GcsBackend;
#[test]
fn new_with_retries_zero_builds_no_retry_probe_destination() {
let cfg = DestinationConfig {
destination_type: DestinationType::Gcs,
bucket: Some("rivet-fastfail-probe".into()),
allow_anonymous: true,
endpoint: Some("http://127.0.0.1:4443".into()),
..Default::default()
};
CloudDestination::<GcsBackend>::new_with_retries(&cfg, 0)
.expect("no-retry probe destination must build");
}
#[test]
fn oneshot_budget_reserves_until_exhausted_then_streams() {
let budget = AtomicI64::new(100);
assert!(take_from(&budget, 60), "first fits");
assert!(take_from(&budget, 30), "second fits (10 left)");
assert!(!take_from(&budget, 30), "third overdraws → stream");
assert_eq!(
budget.load(Ordering::Relaxed),
10,
"budget intact after overdraw"
);
budget.fetch_add(60, Ordering::Relaxed);
assert!(take_from(&budget, 30), "fits again after release");
}
#[test]
fn part_larger_than_whole_budget_never_reserves() {
let budget = AtomicI64::new(64);
assert!(
!take_from(&budget, 1_000),
"part bigger than budget streams"
);
assert_eq!(budget.load(Ordering::Relaxed), 64, "budget untouched");
}
}