use bytes::{Bytes, BytesMut};
use std::fs::File;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::time::SystemTime;
use tokio::io::AsyncReadExt;
use xxhash_rust::xxh3::{Xxh3, xxh3_128};
use crate::constants;
use crate::error::OxenError;
use crate::model::MerkleHash;
use crate::util::hasher::HashingReader;
const ATOMIC_TEMP_INFIX: &str = ".oxentmp.";
struct AtomicTempFile {
tmp: tempfile::NamedTempFile,
target: PathBuf,
}
impl AtomicTempFile {
fn create(target: &Path) -> Result<Self, OxenError> {
let target_name = target.file_name().ok_or_else(|| {
OxenError::file_create_error(
target,
std::io::Error::other("target path has no filename component"),
)
})?;
let parent = target.parent().filter(|p| !p.as_os_str().is_empty());
if let Some(parent) = parent {
std::fs::create_dir_all(parent)
.map_err(|err| OxenError::file_create_error(parent, err))?;
}
let temp_dir = parent.unwrap_or_else(|| Path::new("."));
let temp_prefix = format!("{}{}", target_name.to_string_lossy(), ATOMIC_TEMP_INFIX);
let tmp = tempfile::Builder::new()
.prefix(&temp_prefix)
.suffix("")
.tempfile_in(temp_dir)
.map_err(|err| {
OxenError::file_create_error(temp_dir.join(format!("{temp_prefix}<random>")), err)
})?;
Ok(Self {
tmp,
target: target.to_path_buf(),
})
}
fn as_writer(&mut self) -> &mut tempfile::NamedTempFile {
&mut self.tmp
}
fn set_mtime(&mut self, mtime: SystemTime) -> Result<(), OxenError> {
let ft = filetime::FileTime::from_system_time(mtime);
filetime::set_file_handle_times(self.tmp.as_file(), None, Some(ft))?;
Ok(())
}
fn commit(self) -> Result<(), OxenError> {
let Self { tmp, target } = self;
tmp.as_file().sync_all()?;
let (_file, temp_path) = tmp
.keep()
.map_err(|err| OxenError::file_rename_error(err.file.path(), &target, err.error))?;
if let Err(err) = std::fs::rename(&temp_path, &target) {
let _ = std::fs::remove_file(&temp_path);
return Err(OxenError::file_rename_error(&temp_path, &target, err));
}
if let Some(parent) = target.parent().filter(|p| !p.as_os_str().is_empty()) {
match std::fs::File::open(parent) {
Ok(dir) => {
if let Err(err) = dir.sync_all() {
log::debug!(
"AtomicTempFile::commit: parent fsync failed for {parent:?}: {err}"
);
}
}
Err(err) => log::debug!(
"AtomicTempFile::commit: could not open parent {parent:?} for fsync: {err}"
),
}
}
Ok(())
}
}
#[must_use = "AtomicFile does nothing until a terminal method (write/stream/stream_async/copy_from) is called"]
#[derive(Debug, Clone)]
pub struct AtomicFile {
target: PathBuf,
expected_hash: Option<MerkleHash>,
mtime: Option<SystemTime>,
}
impl AtomicFile {
pub fn new(target: impl AsRef<Path>) -> Self {
Self {
target: target.as_ref().to_path_buf(),
expected_hash: None,
mtime: None,
}
}
pub fn with_hash(mut self, expected_hash: MerkleHash) -> Self {
self.expected_hash = Some(expected_hash);
self
}
pub fn with_mtime(mut self, mtime: SystemTime) -> Self {
self.mtime = Some(mtime);
self
}
pub fn write(self, contents: &[u8]) -> Result<(), OxenError> {
if let Some(expected) = self.expected_hash {
let actual = MerkleHash::new(xxh3_128(contents));
if actual != expected {
return Err(OxenError::HashMismatch {
path: self.target,
expected,
actual,
});
}
}
let mut tmp = AtomicTempFile::create(&self.target)?;
tmp.as_writer().write_all(contents)?;
if let Some(mtime) = self.mtime {
tmp.set_mtime(mtime)?;
}
tmp.commit()
}
pub fn stream<R>(self, reader: &mut R) -> Result<(), OxenError>
where
R: std::io::Read + ?Sized,
{
let mut tmp = AtomicTempFile::create(&self.target)?;
let actual_hash = {
let mut buf_writer =
std::io::BufWriter::with_capacity(constants::STREAMING_BUF_SIZE, tmp.as_writer());
let actual = if self.expected_hash.is_some() {
let mut hashing = HashingReader::new(reader);
std::io::copy(&mut hashing, &mut buf_writer)?;
Some(MerkleHash::new(hashing.digest128()))
} else {
std::io::copy(reader, &mut buf_writer)?;
None
};
buf_writer.flush()?;
actual
};
if let (Some(expected), Some(actual)) = (self.expected_hash, actual_hash)
&& actual != expected
{
return Err(OxenError::HashMismatch {
path: self.target,
expected,
actual,
});
}
if let Some(mtime) = self.mtime {
tmp.set_mtime(mtime)?;
}
tmp.commit()
}
pub async fn stream_async<R>(self, reader: &mut R) -> Result<(), OxenError>
where
R: tokio::io::AsyncRead + Unpin + ?Sized,
{
enum StreamMsg {
Chunk(Bytes),
Err(std::io::Error),
Eof,
}
let Self {
target,
expected_hash,
mtime,
} = self;
let mut reader = tokio::io::BufReader::with_capacity(constants::STREAMING_BUF_SIZE, reader);
let (tx, mut rx) = tokio::sync::mpsc::channel::<StreamMsg>(2);
let writer = tokio::task::spawn_blocking(move || -> Result<(), OxenError> {
let mut tmp = AtomicTempFile::create(&target)?;
let mut hasher = expected_hash.map(|_| Xxh3::new());
let mut saw_eof = false;
while let Some(msg) = rx.blocking_recv() {
match msg {
StreamMsg::Chunk(chunk) => {
if let Some(h) = hasher.as_mut() {
h.update(&chunk);
}
tmp.as_writer().write_all(&chunk)?;
}
StreamMsg::Err(err) => return Err(err.into()),
StreamMsg::Eof => {
saw_eof = true;
break;
}
}
}
if !saw_eof {
return Err(std::io::Error::other(format!(
"AtomicFile::stream_async: stream to {target:?} ended before EOF; \
partial write discarded"
))
.into());
}
if let (Some(expected), Some(hasher)) = (expected_hash, hasher) {
let actual = MerkleHash::new(hasher.digest128());
if actual != expected {
return Err(OxenError::HashMismatch {
path: target,
expected,
actual,
});
}
}
if let Some(mtime) = mtime {
tmp.set_mtime(mtime)?;
}
tmp.commit()
});
let mut buf = BytesMut::with_capacity(constants::STREAMING_BUF_SIZE);
loop {
if buf.capacity() == 0 {
buf.reserve(constants::STREAMING_BUF_SIZE);
}
match reader.read_buf(&mut buf).await {
Ok(0) => {
let _ = tx.send(StreamMsg::Eof).await;
break;
}
Ok(_) => {
let chunk = buf.split().freeze();
if tx.send(StreamMsg::Chunk(chunk)).await.is_err() {
break;
}
}
Err(err) => {
let _ = tx.send(StreamMsg::Err(err)).await;
break;
}
}
}
drop(tx);
writer.await?
}
pub fn copy_from(self, src: &Path) -> Result<(), OxenError> {
let mut src_file = File::open(src).map_err(|err| OxenError::file_error(src, err))?;
self.stream(&mut src_file)
}
}
#[cfg(test)]
mod tests {
use super::{ATOMIC_TEMP_INFIX, AtomicFile, AtomicTempFile};
use crate::error::OxenError;
use crate::model::MerkleHash;
use crate::test;
use std::time::{Duration, SystemTime};
#[test]
fn test_atomic_file_new_makes_no_filesystem_changes() -> Result<(), OxenError> {
test::run_empty_dir_test(|dir| {
let target = dir.join("never_created");
let mtime = SystemTime::UNIX_EPOCH + Duration::from_secs(1_700_000_000);
let bogus_hash = MerkleHash::new(0xdead_beef_dead_beef_dead_beef_dead_beefu128);
let _ = AtomicFile::new(&target)
.with_hash(bogus_hash)
.with_mtime(mtime);
assert!(
!target.exists(),
"AtomicFile::new() must not create the target"
);
let names: Vec<_> = std::fs::read_dir(dir)?
.filter_map(|e| e.ok().map(|e| e.file_name()))
.collect();
assert!(
names.is_empty(),
"AtomicFile::new() / with_* leaked something onto disk: {names:?}",
);
Ok(())
})
}
#[tokio::test]
async fn test_atomic_write_round_trip() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("file.txt");
AtomicFile::new(&target).write(b"hello world")?;
let contents = tokio::fs::read(&target).await?;
assert_eq!(contents, b"hello world");
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_write_overwrites_existing() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("file.txt");
tokio::fs::write(&target, b"old contents").await?;
AtomicFile::new(&target).write(b"new contents")?;
let contents = tokio::fs::read(&target).await?;
assert_eq!(contents, b"new contents");
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_write_creates_parent_dir() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("nested").join("deeper").join("file.txt");
AtomicFile::new(&target).write(b"x")?;
let contents = tokio::fs::read(&target).await?;
assert_eq!(contents, b"x");
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_write_empty_contents() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("file.txt");
AtomicFile::new(&target).write(b"")?;
let contents = tokio::fs::read(&target).await?;
assert!(contents.is_empty());
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_write_verified_commits_on_match() -> Result<(), OxenError> {
use xxhash_rust::xxh3::xxh3_128;
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("blob.bin");
let payload = b"the quick brown fox jumps over the lazy dog";
let expected = MerkleHash::new(xxh3_128(payload));
AtomicFile::new(&target)
.with_hash(expected)
.write(payload)?;
let written = tokio::fs::read(&target).await?;
assert_eq!(written, payload);
let mut entries = tokio::fs::read_dir(&dir).await?;
let mut names = Vec::new();
while let Some(entry) = entries.next_entry().await? {
names.push(entry.file_name());
}
assert_eq!(names.len(), 1, "unexpected leftover files: {names:?}");
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_write_verified_aborts_on_mismatch() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("blob.bin");
let payload = b"the quick brown fox jumps over the lazy dog";
let bogus_expected = MerkleHash::new(0xdead_beef_dead_beef_dead_beef_dead_beefu128);
let result = AtomicFile::new(&target)
.with_hash(bogus_expected)
.write(payload);
match result {
Err(OxenError::HashMismatch {
path,
expected,
actual,
}) => {
assert_eq!(path, target);
assert_eq!(expected, bogus_expected);
assert_ne!(actual, bogus_expected);
}
other => panic!("expected HashMismatch, got {other:?}"),
}
assert!(!tokio::fs::try_exists(&target).await?);
let mut entries = tokio::fs::read_dir(&dir).await?;
assert!(
entries.next_entry().await?.is_none(),
"directory should be empty after mismatched verified write"
);
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_temp_file_name_pattern() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("HEAD");
let tmp = AtomicTempFile::create(&target)?;
let temp_path = tmp.tmp.path().to_path_buf();
let temp_name = temp_path
.file_name()
.expect("temp path has a filename")
.to_string_lossy();
assert!(
temp_name.starts_with("HEAD"),
"temp name {temp_name:?} should start with the target basename"
);
assert!(
temp_name.contains(ATOMIC_TEMP_INFIX),
"temp name {temp_name:?} should contain {:?}",
ATOMIC_TEMP_INFIX
);
assert_eq!(temp_path.parent(), Some(dir.as_path()));
drop(tmp); assert!(!tokio::fs::try_exists(temp_path).await?);
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_stream_streams() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("blob.bin");
let payload: Vec<u8> = (0..50_000u32).flat_map(u32::to_le_bytes).collect();
let mut reader = std::io::Cursor::new(payload.clone());
AtomicFile::new(&target).stream_async(&mut reader).await?;
let written = tokio::fs::read(&target).await?;
assert_eq!(written, payload);
let mut entries = tokio::fs::read_dir(&dir).await?;
let mut names = Vec::new();
while let Some(entry) = entries.next_entry().await? {
names.push(entry.file_name());
}
assert_eq!(names.len(), 1, "unexpected leftover files: {names:?}");
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_stream_cleans_up_on_read_failure() -> Result<(), OxenError> {
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::{AsyncRead, ReadBuf};
struct FailingReader {
served_once: bool,
}
impl AsyncRead for FailingReader {
fn poll_read(
mut self: Pin<&mut Self>,
_: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if !self.served_once {
buf.put_slice(b"some-bytes");
self.served_once = true;
Poll::Ready(Ok(()))
} else {
Poll::Ready(Err(std::io::Error::other("simulated read failure")))
}
}
}
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("blob.bin");
let mut reader = FailingReader { served_once: false };
let result = AtomicFile::new(&target).stream_async(&mut reader).await;
assert!(result.is_err(), "expected the streaming write to fail");
assert!(!tokio::fs::try_exists(&target).await?);
let mut entries = tokio::fs::read_dir(&dir).await?;
assert!(
entries.next_entry().await?.is_none(),
"directory should be empty after failed write"
);
Ok(())
})
.await
}
#[test]
fn test_atomic_stream_sync_streams() -> Result<(), OxenError> {
test::run_empty_dir_test(|dir| {
let target = dir.join("blob.bin");
let payload: Vec<u8> = (0..50_000u32).flat_map(u32::to_le_bytes).collect();
let mut reader = std::io::Cursor::new(payload.clone());
AtomicFile::new(&target).stream(&mut reader)?;
let written = std::fs::read(&target)?;
assert_eq!(written, payload);
let names: Vec<_> = std::fs::read_dir(dir)?
.filter_map(|e| e.ok().map(|e| e.file_name()))
.collect();
assert_eq!(names.len(), 1, "unexpected leftover files: {names:?}");
assert_eq!(names[0], std::ffi::OsStr::new("blob.bin"));
Ok(())
})
}
#[test]
fn test_atomic_stream_sync_cleans_up_on_read_failure() -> Result<(), OxenError> {
struct FailingReader {
served_once: bool,
}
impl std::io::Read for FailingReader {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if !self.served_once {
let bytes = b"some-bytes";
let n = bytes.len().min(buf.len());
buf[..n].copy_from_slice(&bytes[..n]);
self.served_once = true;
Ok(n)
} else {
Err(std::io::Error::other("simulated read failure"))
}
}
}
test::run_empty_dir_test(|dir| {
let target = dir.join("blob.bin");
let mut reader = FailingReader { served_once: false };
let result = AtomicFile::new(&target).stream(&mut reader);
assert!(result.is_err(), "expected the streaming write to fail");
assert!(
!target.exists(),
"target should not exist after failed write"
);
let leftover: Vec<_> = std::fs::read_dir(dir)?.filter_map(|e| e.ok()).collect();
assert!(
leftover.is_empty(),
"directory should be empty after failed write, got {} entries",
leftover.len()
);
Ok(())
})
}
#[tokio::test]
async fn test_atomic_write_concurrent_writers() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("file.txt");
let n: usize = 32;
let mut handles = Vec::with_capacity(n);
for i in 0..n {
let target = target.clone();
let payload = format!("writer-{i}").into_bytes();
handles.push(tokio::task::spawn_blocking(move || {
AtomicFile::new(&target).write(&payload)
}));
}
for h in handles {
h.await
.expect("join should succeed")
.expect("AtomicFile::write should succeed");
}
let final_contents = tokio::fs::read(&target).await?;
let final_str = std::str::from_utf8(&final_contents).expect("contents should be utf-8");
assert!(
final_str.starts_with("writer-"),
"unexpected final contents: {final_str:?}",
);
let mut entries = tokio::fs::read_dir(&dir).await?;
let mut names = Vec::new();
while let Some(entry) = entries.next_entry().await? {
names.push(entry.file_name());
}
assert_eq!(names.len(), 1, "unexpected leftover files: {names:?}");
assert_eq!(names[0], std::ffi::OsStr::new("file.txt"));
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_stream_async_verified_commits_on_match() -> Result<(), OxenError> {
use xxhash_rust::xxh3::xxh3_128;
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("blob.bin");
let payload: Vec<u8> = (0..50_000u32).flat_map(u32::to_le_bytes).collect();
let expected = MerkleHash::new(xxh3_128(&payload));
let mut reader = std::io::Cursor::new(payload.clone());
AtomicFile::new(&target)
.with_hash(expected)
.stream_async(&mut reader)
.await?;
let written = tokio::fs::read(&target).await?;
assert_eq!(written, payload);
let mut entries = tokio::fs::read_dir(&dir).await?;
let mut names = Vec::new();
while let Some(entry) = entries.next_entry().await? {
names.push(entry.file_name());
}
assert_eq!(names.len(), 1, "unexpected leftover files: {names:?}");
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_stream_async_verified_aborts_on_mismatch() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("blob.bin");
let payload: Vec<u8> = (0..50_000u32).flat_map(u32::to_le_bytes).collect();
let mut reader = std::io::Cursor::new(payload);
let bogus_expected = MerkleHash::new(0xdead_beef_dead_beef_dead_beef_dead_beefu128);
let result = AtomicFile::new(&target)
.with_hash(bogus_expected)
.stream_async(&mut reader)
.await;
match result {
Err(OxenError::HashMismatch {
path,
expected,
actual,
}) => {
assert_eq!(path, target);
assert_eq!(expected, bogus_expected);
assert_ne!(actual, bogus_expected);
}
other => panic!("expected HashMismatch, got {other:?}"),
}
assert!(!tokio::fs::try_exists(&target).await?);
let mut entries = tokio::fs::read_dir(&dir).await?;
assert!(
entries.next_entry().await?.is_none(),
"directory should be empty after aborted verified write"
);
Ok(())
})
.await
}
#[test]
fn test_atomic_stream_verified_commits_on_match() -> Result<(), OxenError> {
use xxhash_rust::xxh3::xxh3_128;
test::run_empty_dir_test(|dir| {
let target = dir.join("blob.bin");
let payload: Vec<u8> = (0..50_000u32).flat_map(u32::to_le_bytes).collect();
let expected = MerkleHash::new(xxh3_128(&payload));
let mut reader = std::io::Cursor::new(payload.clone());
AtomicFile::new(&target)
.with_hash(expected)
.stream(&mut reader)?;
let written = std::fs::read(&target)?;
assert_eq!(written, payload);
let names: Vec<_> = std::fs::read_dir(dir)?
.filter_map(|e| e.ok().map(|e| e.file_name()))
.collect();
assert_eq!(names.len(), 1, "unexpected leftover files: {names:?}");
Ok(())
})
}
#[test]
fn test_atomic_stream_verified_aborts_on_mismatch() -> Result<(), OxenError> {
test::run_empty_dir_test(|dir| {
let target = dir.join("blob.bin");
let payload: Vec<u8> = (0..50_000u32).flat_map(u32::to_le_bytes).collect();
let mut reader = std::io::Cursor::new(payload);
let bogus_expected = MerkleHash::new(0xdead_beef_dead_beef_dead_beef_dead_beefu128);
let result = AtomicFile::new(&target)
.with_hash(bogus_expected)
.stream(&mut reader);
match result {
Err(OxenError::HashMismatch {
path,
expected,
actual,
}) => {
assert_eq!(path, target);
assert_eq!(expected, bogus_expected);
assert_ne!(actual, bogus_expected);
}
other => panic!("expected HashMismatch, got {other:?}"),
}
assert!(!target.exists());
let names: Vec<_> = std::fs::read_dir(dir)?
.filter_map(|e| e.ok().map(|e| e.file_name()))
.collect();
assert!(names.is_empty(), "directory should be empty: {names:?}");
Ok(())
})
}
fn fixed_mtime() -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::new(1_700_000_000, 123_456_789)
}
#[test]
fn test_atomic_copy_with_mtime_round_trip() -> Result<(), OxenError> {
test::run_empty_dir_test(|dir| {
let src = dir.join("src.bin");
let dst = dir.join("dst.bin");
let payload: Vec<u8> = (0..5_000u32).flat_map(u32::to_le_bytes).collect();
std::fs::write(&src, &payload)?;
let mtime = fixed_mtime();
AtomicFile::new(&dst).with_mtime(mtime).copy_from(&src)?;
assert_eq!(std::fs::read(&dst)?, payload);
let actual = std::fs::metadata(&dst)?.modified()?;
assert_eq!(actual, mtime);
let names: Vec<_> = std::fs::read_dir(dir)?
.filter_map(|e| e.ok().map(|e| e.file_name()))
.collect();
assert_eq!(names.len(), 2, "expected only src+dst, got: {names:?}");
Ok(())
})
}
#[test]
fn test_atomic_copy_with_mtime_overwrites_existing() -> Result<(), OxenError> {
test::run_empty_dir_test(|dir| {
let src = dir.join("src.bin");
let dst = dir.join("dst.bin");
std::fs::write(&src, b"new content")?;
std::fs::write(&dst, b"old content with different length")?;
AtomicFile::new(&dst)
.with_mtime(fixed_mtime())
.copy_from(&src)?;
assert_eq!(std::fs::read(&dst)?, b"new content");
Ok(())
})
}
#[test]
fn test_atomic_copy_with_mtime_missing_source_leaves_no_scratch() -> Result<(), OxenError> {
test::run_empty_dir_test(|dir| {
let src = dir.join("does-not-exist.bin");
let dst = dir.join("dst.bin");
let result = AtomicFile::new(&dst)
.with_mtime(fixed_mtime())
.copy_from(&src);
assert!(result.is_err(), "expected error for missing source");
assert!(!dst.exists());
let names: Vec<_> = std::fs::read_dir(dir)?
.filter_map(|e| e.ok().map(|e| e.file_name()))
.collect();
assert!(names.is_empty(), "directory should be empty: {names:?}");
Ok(())
})
}
#[tokio::test]
async fn test_atomic_stream_async_with_mtime_round_trip() -> Result<(), OxenError> {
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("blob.bin");
let payload: Vec<u8> = (0..40_000u32).flat_map(u32::to_le_bytes).collect();
let mut reader = std::io::Cursor::new(payload.clone());
let mtime = fixed_mtime();
AtomicFile::new(&target)
.with_mtime(mtime)
.stream_async(&mut reader)
.await?;
assert_eq!(tokio::fs::read(&target).await?, payload);
let actual = tokio::fs::metadata(&target).await?.modified()?;
assert_eq!(actual, mtime);
Ok(())
})
.await
}
#[tokio::test]
async fn test_atomic_stream_async_with_mtime_cleans_up_on_read_failure() -> Result<(), OxenError>
{
use tokio::io::AsyncRead;
struct FailingReader {
yielded: bool,
}
impl AsyncRead for FailingReader {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
if !self.yielded {
buf.put_slice(b"partial");
self.yielded = true;
std::task::Poll::Ready(Ok(()))
} else {
std::task::Poll::Ready(Err(std::io::Error::other("synthetic read failure")))
}
}
}
test::run_empty_dir_test_async(|dir| async move {
let target = dir.join("blob.bin");
let mut reader = FailingReader { yielded: false };
let result = AtomicFile::new(&target)
.with_mtime(fixed_mtime())
.stream_async(&mut reader)
.await;
assert!(result.is_err(), "expected reader failure to surface");
assert!(
!target.exists(),
"target must not appear when stream aborts"
);
let stragglers: Vec<_> = std::fs::read_dir(&dir)?
.filter_map(|e| e.ok().map(|e| e.file_name()))
.collect();
assert!(stragglers.is_empty(), "leftover scratch: {stragglers:?}");
Ok(())
})
.await
}
}