use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use futures::StreamExt;
use object_store::path::Path as ObjPath;
use object_store::{
ObjectStore, ObjectStoreExt, PutMode, PutOptions, PutPayload, UpdateVersion, WriteMultipart,
};
use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, oneshot};
use tracing::warn;
use crate::applied::ExportRequest;
use crate::artifact::{
ExportManifest, MANIFEST_FILE, check_dest_available, hex_encode, manifest_from_slice,
rename_into_place,
};
use crate::export_lease::ExportLease;
use crate::kv::WatchCursor;
use crate::protocol::{PointerState, payload_prunable, pointer_publish_allowed};
use crate::snapshot::SnapshotError;
const CHUNK: usize = 8 << 20;
const MAX_CONCURRENT_PARTS: usize = 8;
const MAX_MANIFEST_BYTES: usize = 1 << 20;
const OP_TIMEOUT: Duration = Duration::from_secs(30);
const MAX_SWAP_ATTEMPTS: usize = 8;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PublishOutcome {
Published,
SupersededByNewer {
remote_cursor: crate::kv::WatchCursor,
},
}
fn cursor_rank(c: &WatchCursor) -> u64 {
c.as_u64().unwrap_or(0)
}
async fn timed_by<T>(
what: &str,
limit: Duration,
fut: impl std::future::Future<Output = T>,
) -> Result<T, SnapshotError> {
tokio::time::timeout(limit, fut).await.map_err(|_| {
SnapshotError::Backend(format!(
"object store: {what} timed out after {}s",
limit.as_secs()
))
})
}
async fn timed<T>(
what: &str,
fut: impl std::future::Future<Output = T>,
) -> Result<T, SnapshotError> {
timed_by(what, OP_TIMEOUT, fut).await
}
#[async_trait]
pub trait ArtifactTransport: Send + Sync {
async fn upload(&self, key: &str, artifact_dir: &Path)
-> Result<PublishOutcome, SnapshotError>;
async fn manifest(&self, key: &str) -> Result<ExportManifest, SnapshotError>;
async fn prune(&self, key: &str, grace: Duration) -> Result<usize, SnapshotError> {
let _ = (key, grace);
Ok(0)
}
async fn download(&self, key: &str, dest_dir: &Path) -> Result<ExportManifest, SnapshotError>;
}
pub struct ObjectStoreTransport {
store: Arc<dyn ObjectStore>,
prefix: ObjPath,
allow_non_atomic_pointer: bool,
}
impl ObjectStoreTransport {
pub fn new(store: Arc<dyn ObjectStore>, prefix: impl AsRef<str>) -> Self {
Self {
store,
prefix: ObjPath::from(prefix.as_ref()),
allow_non_atomic_pointer: false,
}
}
pub fn with_non_atomic_pointer_fallback(mut self) -> Self {
self.allow_non_atomic_pointer = true;
self
}
pub fn from_url_opts<I, K, V>(url: &str, options: I) -> Result<Self, SnapshotError>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let url = url::Url::parse(url)
.map_err(|e| SnapshotError::Backend(format!("invalid transport url: {e}")))?;
let (store, prefix) = object_store::parse_url_opts(&url, options).map_err(map_obj)?;
Ok(Self {
store: Arc::from(store),
prefix,
allow_non_atomic_pointer: false,
})
}
fn payloads_dir(&self, key: &str) -> ObjPath {
ObjPath::from(format!("{}/{key}.payloads", self.prefix))
}
fn payload_path(&self, key: &str, cursor: &WatchCursor, pointer_bytes: &[u8]) -> ObjPath {
let digest = blake3::hash(pointer_bytes);
let hex = hex_encode(&digest.as_bytes()[..8]);
ObjPath::from(format!(
"{}/{key}.payloads/{:016x}-{hex}.tar",
self.prefix,
cursor_rank(cursor)
))
}
fn manifest_path(&self, key: &str) -> ObjPath {
ObjPath::from(format!("{}/{key}.manifest.json", self.prefix))
}
async fn swap_pointer(
&self,
key: &str,
new_cursor: &WatchCursor,
pointer_bytes: &[u8],
) -> Result<PublishOutcome, SnapshotError> {
let path = self.manifest_path(key);
for _ in 0..MAX_SWAP_ATTEMPTS {
let current = match timed("pointer get", self.store.get(&path)).await? {
Ok(get) => {
let meta = get.meta.clone();
let mut stream = get.into_stream();
let mut buf = Vec::new();
while let Some(chunk) = timed("pointer read", stream.next()).await? {
let chunk = chunk.map_err(map_obj)?;
if buf.len() + chunk.len() > MAX_MANIFEST_BYTES {
return Err(SnapshotError::ArtifactInvalid(format!(
"remote pointer for {key:?} exceeds {MAX_MANIFEST_BYTES} bytes"
)));
}
buf.extend_from_slice(&chunk);
}
Some((meta, buf))
}
Err(object_store::Error::NotFound { .. }) => None,
Err(e) => return Err(map_obj(e)),
};
match current {
None => {
let opts = PutOptions::from(PutMode::Create);
match timed(
"pointer create",
self.store
.put_opts(&path, PutPayload::from(pointer_bytes.to_vec()), opts),
)
.await?
{
Ok(_) => return Ok(PublishOutcome::Published),
Err(object_store::Error::AlreadyExists { .. }) => continue, Err(e) => return Err(map_obj(e)),
}
}
Some((meta, bytes)) => {
let existing = manifest_from_slice(&bytes).ok();
let observed = PointerState::Present {
rank: existing.as_ref().map(|m| cursor_rank(&m.cursor)),
};
if !pointer_publish_allowed(&observed, cursor_rank(new_cursor)) {
let existing =
existing.expect("refusal implies a parseable, newer pointer");
return Ok(PublishOutcome::SupersededByNewer {
remote_cursor: existing.cursor,
});
}
let opts = PutOptions::from(PutMode::Update(UpdateVersion {
e_tag: meta.e_tag,
version: meta.version,
}));
match timed(
"pointer swap",
self.store
.put_opts(&path, PutPayload::from(pointer_bytes.to_vec()), opts),
)
.await?
{
Ok(_) => return Ok(PublishOutcome::Published),
Err(object_store::Error::Precondition { .. }) => continue, Err(object_store::Error::NotFound { .. }) => continue, Err(object_store::Error::NotImplemented { .. }) => {
if !self.allow_non_atomic_pointer {
return Err(SnapshotError::Backend(format!(
"object store lacks conditional puts (PutMode::Update \
unimplemented); the pointer swap for {key:?} cannot be \
atomic and this store is outside the verified protocol. \
For dev/test stores (file://), opt in explicitly with \
ObjectStoreTransport::with_non_atomic_pointer_fallback()"
)));
}
warn!(
key,
"non-atomic pointer fallback (explicit opt-in): publish is \
read-check-then-put; concurrent publishers may race"
);
timed(
"pointer put (non-atomic fallback)",
self.store
.put(&path, PutPayload::from(pointer_bytes.to_vec())),
)
.await?
.map_err(map_obj)?;
return Ok(PublishOutcome::Published);
}
Err(e) => return Err(map_obj(e)),
}
}
}
}
Err(SnapshotError::Backend(format!(
"pointer swap for {key:?} lost {MAX_SWAP_ATTEMPTS} consecutive CAS races; giving up"
)))
}
async fn fetch_manifest_bytes(&self, key: &str) -> Result<Vec<u8>, SnapshotError> {
let mut stream = timed("manifest get", self.store.get(&self.manifest_path(key)))
.await?
.map_err(map_obj)?
.into_stream();
let mut buf = Vec::new();
while let Some(chunk) = timed("manifest read", stream.next()).await? {
let chunk = chunk.map_err(map_obj)?;
if buf.len() + chunk.len() > MAX_MANIFEST_BYTES {
return Err(SnapshotError::ArtifactInvalid(format!(
"remote manifest for {key:?} exceeds {MAX_MANIFEST_BYTES} bytes"
)));
}
buf.extend_from_slice(&chunk);
}
Ok(buf)
}
}
#[async_trait]
impl ArtifactTransport for ObjectStoreTransport {
async fn upload(
&self,
key: &str,
artifact_dir: &Path,
) -> Result<PublishOutcome, SnapshotError> {
let manifest_bytes = tokio::fs::read(artifact_dir.join(MANIFEST_FILE))
.await
.map_err(SnapshotError::Io)?;
let manifest = manifest_from_slice(&manifest_bytes)?;
let src = artifact_dir.to_path_buf();
let tar_parent = artifact_dir
.parent()
.filter(|p| !p.as_os_str().is_empty())
.ok_or_else(|| {
SnapshotError::ArtifactInvalid(format!(
"artifact {} has no parent directory to stage the tar in",
artifact_dir.display()
))
})?
.to_path_buf();
let tar_file = tokio::task::spawn_blocking(
move || -> Result<tempfile::NamedTempFile, SnapshotError> {
let tmp = tempfile::NamedTempFile::new_in(&tar_parent)?;
let mut builder = tar::Builder::new(std::io::BufWriter::new(tmp.reopen()?));
builder.append_dir_all(".", &src)?;
builder
.into_inner()?
.into_inner()
.map_err(|e| SnapshotError::Io(e.into_error()))?;
Ok(tmp)
},
)
.await
.map_err(|e| SnapshotError::Backend(format!("tar task panicked: {e}")))??;
let mut file = tokio::fs::File::open(tar_file.path())
.await
.map_err(SnapshotError::Io)?;
let upload = timed(
"multipart create",
self.store
.put_multipart(&self.payload_path(key, &manifest.cursor, &manifest_bytes)),
)
.await?
.map_err(map_obj)?;
let mut wm = WriteMultipart::new_with_chunk_size(upload, CHUNK);
let mut buf = vec![0u8; CHUNK];
loop {
use tokio::io::AsyncReadExt;
let n = file.read(&mut buf).await.map_err(SnapshotError::Io)?;
if n == 0 {
break;
}
timed("part upload", wm.wait_for_capacity(MAX_CONCURRENT_PARTS))
.await?
.map_err(map_obj)?;
wm.write(&buf[..n]);
}
timed_by(
"multipart finish",
OP_TIMEOUT * MAX_CONCURRENT_PARTS as u32,
wm.finish(),
)
.await?
.map_err(map_obj)?;
self.swap_pointer(key, &manifest.cursor, &manifest_bytes)
.await
}
async fn manifest(&self, key: &str) -> Result<ExportManifest, SnapshotError> {
let bytes = self.fetch_manifest_bytes(key).await?;
manifest_from_slice(&bytes)
}
async fn download(&self, key: &str, dest_dir: &Path) -> Result<ExportManifest, SnapshotError> {
check_dest_available(dest_dir)?;
let parent = dest_dir
.parent()
.filter(|p| !p.as_os_str().is_empty())
.ok_or_else(|| {
SnapshotError::ArtifactInvalid(format!(
"destination {} has no parent directory",
dest_dir.display()
))
})?;
let sibling = self.fetch_manifest_bytes(key).await?;
let manifest = manifest_from_slice(&sibling)?;
let tar_tmp = tempfile::NamedTempFile::new_in(parent)?;
let mut tar_writer = tokio::fs::File::create(tar_tmp.path())
.await
.map_err(SnapshotError::Io)?;
let mut stream = timed(
"payload get",
self.store
.get(&self.payload_path(key, &manifest.cursor, &sibling)),
)
.await?
.map_err(map_obj)?
.into_stream();
while let Some(chunk) = timed("payload read", stream.next()).await? {
let chunk = chunk.map_err(map_obj)?;
tar_writer
.write_all(&chunk)
.await
.map_err(SnapshotError::Io)?;
}
tar_writer.flush().await.map_err(SnapshotError::Io)?;
drop(tar_writer);
let dest = dest_dir.to_path_buf();
let parent = parent.to_path_buf();
tokio::task::spawn_blocking(move || -> Result<(), SnapshotError> {
let stage = tempfile::Builder::new()
.prefix(".slipstream-download-")
.tempdir_in(&parent)?;
let file = std::fs::File::open(tar_tmp.path())?;
let mut archive = tar::Archive::new(std::io::BufReader::new(file));
archive.set_preserve_permissions(false);
archive.set_preserve_mtime(false);
archive.unpack(stage.path())?;
let embedded = std::fs::read(stage.path().join(MANIFEST_FILE)).map_err(|_| {
SnapshotError::ArtifactInvalid(
"downloaded artifact tar has no embedded manifest".into(),
)
})?;
if embedded != sibling {
return Err(SnapshotError::ArtifactInvalid(
"embedded manifest disagrees with the sibling manifest object".into(),
));
}
check_dest_available(&dest)?;
let root = stage.keep();
rename_into_place(&root, &dest)?;
Ok(())
})
.await
.map_err(|e| SnapshotError::Backend(format!("untar task panicked: {e}")))??;
Ok(manifest)
}
async fn prune(&self, key: &str, grace: Duration) -> Result<usize, SnapshotError> {
let pointer = match self.fetch_manifest_bytes(key).await {
Ok(b) => b,
Err(_) => return Ok(0),
};
let Ok(current) = manifest_from_slice(&pointer) else {
return Ok(0);
};
let keep = self.payload_path(key, ¤t.cursor, &pointer);
let pointer_rank = cursor_rank(¤t.cursor);
let cutoff_millis = std::time::SystemTime::now()
.checked_sub(grace)
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map_or(0, |d| d.as_millis() as i64);
let mut deleted = 0usize;
let mut listing = self.store.list(Some(&self.payloads_dir(key)));
while let Some(meta) = timed("payload list", listing.next()).await? {
let meta = meta.map_err(map_obj)?;
let payload_rank = meta
.location
.filename()
.and_then(|f| f.split('-').next())
.and_then(|h| u64::from_str_radix(h, 16).ok());
if payload_prunable(
payload_rank,
pointer_rank,
meta.location == keep,
meta.last_modified.timestamp_millis() <= cutoff_millis,
) {
timed("payload delete", self.store.delete(&meta.location))
.await?
.map_err(map_obj)?;
deleted += 1;
}
}
Ok(deleted)
}
}
fn map_obj(e: object_store::Error) -> SnapshotError {
match e {
object_store::Error::NotFound { path, .. } => {
SnapshotError::ArtifactInvalid(format!("remote artifact object not found: {path}"))
}
other => SnapshotError::Backend(format!("object store: {other}")),
}
}
pub async fn run_export_round(
lease: &ExportLease,
ttl: Duration,
exports: &mpsc::Sender<ExportRequest>,
transport: &dyn ArtifactTransport,
key: &str,
scratch_dir: &Path,
) -> Result<Option<ExportManifest>, SnapshotError> {
let Some(guard) = lease
.try_acquire(ttl)
.await
.map_err(|e| SnapshotError::Backend(format!("export lease: {e}")))?
else {
return Ok(None);
};
let round_dir = match tempfile::Builder::new()
.prefix(".slipstream-export-round-")
.tempdir_in(scratch_dir)
{
Ok(d) => d,
Err(e) => {
guard.abandon().await;
return Err(SnapshotError::Io(e));
}
};
let artifact_dir = round_dir.path().join("artifact");
let (reply_tx, reply_rx) = oneshot::channel();
let request = ExportRequest {
dest_dir: artifact_dir.clone(),
reply: reply_tx,
};
if exports.send(request).await.is_err() {
guard.abandon().await;
return Err(SnapshotError::Backend(
"watch loop is gone; export request not delivered".into(),
));
}
let manifest = match reply_rx.await {
Ok(Ok(m)) => m,
Ok(Err(e)) => {
guard.abandon().await;
return Err(e);
}
Err(_) => {
guard.abandon().await;
return Err(SnapshotError::Backend(
"watch loop dropped the export reply".into(),
));
}
};
let outcome = match transport.upload(key, &artifact_dir).await {
Ok(o) => o,
Err(e) => {
guard.abandon().await;
return Err(e);
}
};
if let PublishOutcome::SupersededByNewer { remote_cursor } = &outcome {
warn!(
key,
local = ?manifest.cursor,
remote = ?remote_cursor,
"export round superseded by a newer published artifact; pointer left untouched"
);
}
if let Err(e) = guard.complete(&manifest.cursor).await {
warn!(key, error = %e, "export round uploaded but completion record failed");
}
if matches!(outcome, PublishOutcome::Published) {
if let Err(e) = transport.prune(key, ttl.saturating_mul(4)).await {
warn!(key, error = %e, "stale payload prune failed; retried next round");
}
}
drop(round_dir); Ok(Some(manifest))
}
async fn download_to_scratch(
transport: &dyn ArtifactTransport,
key: &str,
scratch_dir: &Path,
) -> Result<(tempfile::TempDir, PathBuf), SnapshotError> {
let tmp = tempfile::Builder::new()
.prefix(".slipstream-bootstrap-")
.tempdir_in(scratch_dir)?;
let artifact = tmp.path().join("artifact");
transport.download(key, &artifact).await?;
Ok((tmp, artifact))
}
impl crate::AppendLogSnapshot {
pub async fn import_remote(
transport: &dyn ArtifactTransport,
key: &str,
scratch_dir: &Path,
dest_path: &Path,
compact_threshold: u64,
) -> Result<(WatchCursor, Self), SnapshotError> {
let (_guard, artifact) = download_to_scratch(transport, key, scratch_dir).await?;
let dest = dest_path.to_path_buf();
tokio::task::spawn_blocking(move || Self::import(&artifact, &dest, compact_threshold))
.await
.map_err(|e| SnapshotError::Backend(format!("import task panicked: {e}")))?
}
}
#[cfg(feature = "fjall")]
impl crate::FjallSnapshot {
pub async fn import_remote(
transport: &dyn ArtifactTransport,
key: &str,
scratch_dir: &Path,
dest_dir: &Path,
config: crate::FjallConfig,
) -> Result<(WatchCursor, Self), SnapshotError> {
let (_guard, artifact) = download_to_scratch(transport, key, scratch_dir).await?;
let dest = dest_dir.to_path_buf();
tokio::task::spawn_blocking(move || Self::import(&artifact, &dest, config))
.await
.map_err(|e| SnapshotError::Backend(format!("import task panicked: {e}")))?
}
}
#[cfg(feature = "rocksdb")]
impl crate::RocksDbSnapshot {
pub async fn import_remote(
transport: &dyn ArtifactTransport,
key: &str,
scratch_dir: &Path,
dest_dir: &Path,
config: crate::RocksDbConfig,
) -> Result<(WatchCursor, Self), SnapshotError> {
let (_guard, artifact) = download_to_scratch(transport, key, scratch_dir).await?;
let dest = dest_dir.to_path_buf();
tokio::task::spawn_blocking(move || Self::import(&artifact, &dest, config))
.await
.map_err(|e| SnapshotError::Backend(format!("import task panicked: {e}")))?
}
}