use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use futures::StreamExt;
use object_store::path::Path as ObjPath;
use object_store::{ObjectStore, ObjectStoreExt, PutPayload, WriteMultipart};
use tokio::io::AsyncWriteExt;
use tokio::sync::{mpsc, oneshot};
use tracing::warn;
use crate::applied::ExportRequest;
use crate::artifact::{
ExportManifest, MANIFEST_FILE, check_dest_available, manifest_from_slice, rename_into_place,
};
use crate::export_lease::ExportLease;
use crate::kv::WatchCursor;
use crate::snapshot::SnapshotError;
const CHUNK: usize = 8 << 20;
const MAX_CONCURRENT_PARTS: usize = 8;
const MAX_MANIFEST_BYTES: usize = 1 << 20;
const OP_TIMEOUT: Duration = Duration::from_secs(30);
async fn timed_by<T>(
what: &str,
limit: Duration,
fut: impl std::future::Future<Output = T>,
) -> Result<T, SnapshotError> {
tokio::time::timeout(limit, fut).await.map_err(|_| {
SnapshotError::Backend(format!(
"object store: {what} timed out after {}s",
limit.as_secs()
))
})
}
async fn timed<T>(
what: &str,
fut: impl std::future::Future<Output = T>,
) -> Result<T, SnapshotError> {
timed_by(what, OP_TIMEOUT, fut).await
}
#[async_trait]
pub trait ArtifactTransport: Send + Sync {
async fn upload(&self, key: &str, artifact_dir: &Path) -> Result<(), SnapshotError>;
async fn manifest(&self, key: &str) -> Result<ExportManifest, SnapshotError>;
async fn download(&self, key: &str, dest_dir: &Path) -> Result<ExportManifest, SnapshotError>;
}
pub struct ObjectStoreTransport {
store: Arc<dyn ObjectStore>,
prefix: ObjPath,
}
impl ObjectStoreTransport {
pub fn new(store: Arc<dyn ObjectStore>, prefix: impl AsRef<str>) -> Self {
Self {
store,
prefix: ObjPath::from(prefix.as_ref()),
}
}
pub fn from_url_opts<I, K, V>(url: &str, options: I) -> Result<Self, SnapshotError>
where
I: IntoIterator<Item = (K, V)>,
K: AsRef<str>,
V: Into<String>,
{
let url = url::Url::parse(url)
.map_err(|e| SnapshotError::Backend(format!("invalid transport url: {e}")))?;
let (store, prefix) = object_store::parse_url_opts(&url, options).map_err(map_obj)?;
Ok(Self {
store: Arc::from(store),
prefix,
})
}
fn payload_path(&self, key: &str) -> ObjPath {
ObjPath::from(format!("{}/{key}", self.prefix))
}
fn manifest_path(&self, key: &str) -> ObjPath {
ObjPath::from(format!("{}/{key}.manifest.json", self.prefix))
}
async fn fetch_manifest_bytes(&self, key: &str) -> Result<Vec<u8>, SnapshotError> {
let mut stream = timed("manifest get", self.store.get(&self.manifest_path(key)))
.await?
.map_err(map_obj)?
.into_stream();
let mut buf = Vec::new();
while let Some(chunk) = timed("manifest read", stream.next()).await? {
let chunk = chunk.map_err(map_obj)?;
if buf.len() + chunk.len() > MAX_MANIFEST_BYTES {
return Err(SnapshotError::ArtifactInvalid(format!(
"remote manifest for {key:?} exceeds {MAX_MANIFEST_BYTES} bytes"
)));
}
buf.extend_from_slice(&chunk);
}
Ok(buf)
}
}
#[async_trait]
impl ArtifactTransport for ObjectStoreTransport {
async fn upload(&self, key: &str, artifact_dir: &Path) -> Result<(), SnapshotError> {
let manifest_bytes = tokio::fs::read(artifact_dir.join(MANIFEST_FILE))
.await
.map_err(SnapshotError::Io)?;
manifest_from_slice(&manifest_bytes)?;
let src = artifact_dir.to_path_buf();
let tar_parent = artifact_dir
.parent()
.filter(|p| !p.as_os_str().is_empty())
.ok_or_else(|| {
SnapshotError::ArtifactInvalid(format!(
"artifact {} has no parent directory to stage the tar in",
artifact_dir.display()
))
})?
.to_path_buf();
let tar_file = tokio::task::spawn_blocking(
move || -> Result<tempfile::NamedTempFile, SnapshotError> {
let tmp = tempfile::NamedTempFile::new_in(&tar_parent)?;
let mut builder = tar::Builder::new(std::io::BufWriter::new(tmp.reopen()?));
builder.append_dir_all(".", &src)?;
builder
.into_inner()?
.into_inner()
.map_err(|e| SnapshotError::Io(e.into_error()))?;
Ok(tmp)
},
)
.await
.map_err(|e| SnapshotError::Backend(format!("tar task panicked: {e}")))??;
let mut file = tokio::fs::File::open(tar_file.path())
.await
.map_err(SnapshotError::Io)?;
let upload = timed(
"multipart create",
self.store.put_multipart(&self.payload_path(key)),
)
.await?
.map_err(map_obj)?;
let mut wm = WriteMultipart::new_with_chunk_size(upload, CHUNK);
let mut buf = vec![0u8; CHUNK];
loop {
use tokio::io::AsyncReadExt;
let n = file.read(&mut buf).await.map_err(SnapshotError::Io)?;
if n == 0 {
break;
}
timed("part upload", wm.wait_for_capacity(MAX_CONCURRENT_PARTS))
.await?
.map_err(map_obj)?;
wm.write(&buf[..n]);
}
timed_by(
"multipart finish",
OP_TIMEOUT * MAX_CONCURRENT_PARTS as u32,
wm.finish(),
)
.await?
.map_err(map_obj)?;
timed(
"manifest put",
self.store
.put(&self.manifest_path(key), PutPayload::from(manifest_bytes)),
)
.await?
.map_err(map_obj)?;
Ok(())
}
async fn manifest(&self, key: &str) -> Result<ExportManifest, SnapshotError> {
let bytes = self.fetch_manifest_bytes(key).await?;
manifest_from_slice(&bytes)
}
async fn download(&self, key: &str, dest_dir: &Path) -> Result<ExportManifest, SnapshotError> {
check_dest_available(dest_dir)?;
let parent = dest_dir
.parent()
.filter(|p| !p.as_os_str().is_empty())
.ok_or_else(|| {
SnapshotError::ArtifactInvalid(format!(
"destination {} has no parent directory",
dest_dir.display()
))
})?;
let sibling = self.fetch_manifest_bytes(key).await?;
let manifest = manifest_from_slice(&sibling)?;
let tar_tmp = tempfile::NamedTempFile::new_in(parent)?;
let mut tar_writer = tokio::fs::File::create(tar_tmp.path())
.await
.map_err(SnapshotError::Io)?;
let mut stream = timed("payload get", self.store.get(&self.payload_path(key)))
.await?
.map_err(map_obj)?
.into_stream();
while let Some(chunk) = timed("payload read", stream.next()).await? {
let chunk = chunk.map_err(map_obj)?;
tar_writer
.write_all(&chunk)
.await
.map_err(SnapshotError::Io)?;
}
tar_writer.flush().await.map_err(SnapshotError::Io)?;
drop(tar_writer);
let dest = dest_dir.to_path_buf();
let parent = parent.to_path_buf();
tokio::task::spawn_blocking(move || -> Result<(), SnapshotError> {
let stage = tempfile::Builder::new()
.prefix(".slipstream-download-")
.tempdir_in(&parent)?;
let file = std::fs::File::open(tar_tmp.path())?;
let mut archive = tar::Archive::new(std::io::BufReader::new(file));
archive.set_preserve_permissions(false);
archive.set_preserve_mtime(false);
archive.unpack(stage.path())?;
let embedded = std::fs::read(stage.path().join(MANIFEST_FILE)).map_err(|_| {
SnapshotError::ArtifactInvalid(
"downloaded artifact tar has no embedded manifest".into(),
)
})?;
if embedded != sibling {
return Err(SnapshotError::ArtifactInvalid(
"embedded manifest disagrees with the sibling manifest object".into(),
));
}
check_dest_available(&dest)?;
let root = stage.keep();
rename_into_place(&root, &dest)?;
Ok(())
})
.await
.map_err(|e| SnapshotError::Backend(format!("untar task panicked: {e}")))??;
Ok(manifest)
}
}
fn map_obj(e: object_store::Error) -> SnapshotError {
match e {
object_store::Error::NotFound { path, .. } => {
SnapshotError::ArtifactInvalid(format!("remote artifact object not found: {path}"))
}
other => SnapshotError::Backend(format!("object store: {other}")),
}
}
pub async fn run_export_round(
lease: &ExportLease,
ttl: Duration,
exports: &mpsc::Sender<ExportRequest>,
transport: &dyn ArtifactTransport,
key: &str,
scratch_dir: &Path,
) -> Result<Option<ExportManifest>, SnapshotError> {
let Some(guard) = lease
.try_acquire(ttl)
.await
.map_err(|e| SnapshotError::Backend(format!("export lease: {e}")))?
else {
return Ok(None);
};
let round_dir = match tempfile::Builder::new()
.prefix(".slipstream-export-round-")
.tempdir_in(scratch_dir)
{
Ok(d) => d,
Err(e) => {
guard.abandon().await;
return Err(SnapshotError::Io(e));
}
};
let artifact_dir = round_dir.path().join("artifact");
let (reply_tx, reply_rx) = oneshot::channel();
let request = ExportRequest {
dest_dir: artifact_dir.clone(),
reply: reply_tx,
};
if exports.send(request).await.is_err() {
guard.abandon().await;
return Err(SnapshotError::Backend(
"watch loop is gone; export request not delivered".into(),
));
}
let manifest = match reply_rx.await {
Ok(Ok(m)) => m,
Ok(Err(e)) => {
guard.abandon().await;
return Err(e);
}
Err(_) => {
guard.abandon().await;
return Err(SnapshotError::Backend(
"watch loop dropped the export reply".into(),
));
}
};
if let Err(e) = transport.upload(key, &artifact_dir).await {
guard.abandon().await;
return Err(e);
}
if let Err(e) = guard.complete(&manifest.cursor).await {
warn!(key, error = %e, "export round uploaded but completion record failed");
}
drop(round_dir); Ok(Some(manifest))
}
async fn download_to_scratch(
transport: &dyn ArtifactTransport,
key: &str,
scratch_dir: &Path,
) -> Result<(tempfile::TempDir, PathBuf), SnapshotError> {
let tmp = tempfile::Builder::new()
.prefix(".slipstream-bootstrap-")
.tempdir_in(scratch_dir)?;
let artifact = tmp.path().join("artifact");
transport.download(key, &artifact).await?;
Ok((tmp, artifact))
}
impl crate::AppendLogSnapshot {
pub async fn import_remote(
transport: &dyn ArtifactTransport,
key: &str,
scratch_dir: &Path,
dest_path: &Path,
compact_threshold: u64,
) -> Result<(WatchCursor, Self), SnapshotError> {
let (_guard, artifact) = download_to_scratch(transport, key, scratch_dir).await?;
let dest = dest_path.to_path_buf();
tokio::task::spawn_blocking(move || Self::import(&artifact, &dest, compact_threshold))
.await
.map_err(|e| SnapshotError::Backend(format!("import task panicked: {e}")))?
}
}
#[cfg(feature = "fjall")]
impl crate::FjallSnapshot {
pub async fn import_remote(
transport: &dyn ArtifactTransport,
key: &str,
scratch_dir: &Path,
dest_dir: &Path,
config: crate::FjallConfig,
) -> Result<(WatchCursor, Self), SnapshotError> {
let (_guard, artifact) = download_to_scratch(transport, key, scratch_dir).await?;
let dest = dest_dir.to_path_buf();
tokio::task::spawn_blocking(move || Self::import(&artifact, &dest, config))
.await
.map_err(|e| SnapshotError::Backend(format!("import task panicked: {e}")))?
}
}
#[cfg(feature = "rocksdb")]
impl crate::RocksDbSnapshot {
pub async fn import_remote(
transport: &dyn ArtifactTransport,
key: &str,
scratch_dir: &Path,
dest_dir: &Path,
config: crate::RocksDbConfig,
) -> Result<(WatchCursor, Self), SnapshotError> {
let (_guard, artifact) = download_to_scratch(transport, key, scratch_dir).await?;
let dest = dest_dir.to_path_buf();
tokio::task::spawn_blocking(move || Self::import(&artifact, &dest, config))
.await
.map_err(|e| SnapshotError::Backend(format!("import task panicked: {e}")))?
}
}