use moonpool_core::{OpenOptions, StorageFile, StorageProvider};
use moonpool_sim::{SimWorld, StorageConfiguration};
use std::net::IpAddr;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
const TEST_IP_STR: &str = "127.0.0.1";
fn test_ip() -> IpAddr {
TEST_IP_STR.parse().expect("valid IP")
}
fn local_runtime() -> tokio::runtime::LocalRuntime {
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build_local(Default::default())
.expect("Failed to build local runtime")
}
fn fast_sim() -> SimWorld {
let mut sim = SimWorld::new();
sim.set_storage_config(StorageConfiguration::fast_local());
sim
}
#[test]
fn test_multiple_files_open_simultaneously() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let mut files = Vec::new();
for i in 0..5 {
let file = provider
.open(&format!("multi_{}.txt", i), OpenOptions::create_write())
.await?;
files.push(file);
}
assert_eq!(files.len(), 5);
drop(files);
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
});
}
#[test]
fn test_interleaved_writes() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let mut file_a = provider
.open("file_a.txt", OpenOptions::create_write())
.await?;
let mut file_b = provider
.open("file_b.txt", OpenOptions::create_write())
.await?;
file_a.write_all(b"A1").await?;
file_b.write_all(b"B1").await?;
file_a.write_all(b"A2").await?;
file_b.write_all(b"B2").await?;
file_a.write_all(b"A3").await?;
file_b.write_all(b"B3").await?;
file_a.sync_all().await?;
file_b.sync_all().await?;
drop(file_a);
drop(file_b);
let mut file_a = provider
.open("file_a.txt", OpenOptions::read_only())
.await?;
let mut buf_a = Vec::new();
file_a.read_to_end(&mut buf_a).await?;
assert_eq!(&buf_a, b"A1A2A3", "File A should have correct content");
let mut file_b = provider
.open("file_b.txt", OpenOptions::read_only())
.await?;
let mut buf_b = Vec::new();
file_b.read_to_end(&mut buf_b).await?;
assert_eq!(&buf_b, b"B1B2B3", "File B should have correct content");
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
});
}
#[test]
fn test_write_one_read_another() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let mut source = provider
.open("read_source.txt", OpenOptions::create_write())
.await?;
source.write_all(b"source data for reading").await?;
source.sync_all().await?;
drop(source);
let mut write_file = provider
.open("write_target.txt", OpenOptions::create_write())
.await?;
let mut read_file = provider
.open("read_source.txt", OpenOptions::read_only())
.await?;
let mut buf = [0u8; 6];
read_file.read_exact(&mut buf).await?;
write_file.write_all(&buf).await?;
read_file.read_exact(&mut buf).await?;
write_file.write_all(&buf).await?;
write_file.sync_all().await?;
drop(write_file);
drop(read_file);
let mut verify_file = provider
.open("write_target.txt", OpenOptions::read_only())
.await?;
let mut result = Vec::new();
verify_file.read_to_end(&mut result).await?;
assert_eq!(&result, b"source data ");
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
});
}
#[test]
fn test_sequential_opens_same_file() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let mut file = provider
.open("reopen.txt", OpenOptions::create_write())
.await?;
file.write_all(b"initial").await?;
file.sync_all().await?;
drop(file);
let mut file = provider
.open("reopen.txt", OpenOptions::new().read(true).write(true))
.await?;
file.seek(std::io::SeekFrom::End(0)).await?;
file.write_all(b"_appended").await?;
file.sync_all().await?;
drop(file);
let mut file = provider
.open("reopen.txt", OpenOptions::read_only())
.await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
assert_eq!(&buf, b"initial_appended");
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
});
}
#[test]
fn test_file_independence() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
for i in 0..3 {
let mut file = provider
.open(
&format!("independent_{}.txt", i),
OpenOptions::create_write(),
)
.await?;
let content = format!("Content for file {}", i);
file.write_all(content.as_bytes()).await?;
file.sync_all().await?;
drop(file);
}
provider.delete("independent_1.txt").await?;
provider
.rename("independent_0.txt", "renamed_0.txt")
.await?;
assert!(
!provider.exists("independent_0.txt").await?,
"Old name should not exist"
);
assert!(
provider.exists("renamed_0.txt").await?,
"New name should exist"
);
assert!(
!provider.exists("independent_1.txt").await?,
"Deleted file should not exist"
);
assert!(
provider.exists("independent_2.txt").await?,
"Unchanged file should exist"
);
let mut file = provider
.open("independent_2.txt", OpenOptions::read_only())
.await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
assert_eq!(&buf, b"Content for file 2");
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
});
}
#[test]
fn test_independent_file_positions() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
for name in &["pos_a.txt", "pos_b.txt"] {
let mut file = provider.open(name, OpenOptions::create_write()).await?;
file.write_all(b"0123456789").await?;
file.sync_all().await?;
drop(file);
}
let mut file_a = provider.open("pos_a.txt", OpenOptions::read_only()).await?;
let mut file_b = provider.open("pos_b.txt", OpenOptions::read_only()).await?;
file_a.seek(std::io::SeekFrom::Start(2)).await?;
file_b.seek(std::io::SeekFrom::Start(7)).await?;
let mut buf_a = [0u8; 3];
let mut buf_b = [0u8; 3];
file_a.read_exact(&mut buf_a).await?;
file_b.read_exact(&mut buf_b).await?;
assert_eq!(&buf_a, b"234", "File A should read from position 2");
assert_eq!(&buf_b, b"789", "File B should read from position 7");
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
});
}
#[test]
fn test_many_files() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let file_count = 20;
for i in 0..file_count {
let mut file = provider
.open(&format!("many_{:03}.txt", i), OpenOptions::create_write())
.await?;
file.write_all(format!("File number {}", i).as_bytes())
.await?;
file.sync_all().await?;
drop(file);
}
let mut count = 0;
for i in 0..file_count {
if provider.exists(&format!("many_{:03}.txt", i)).await? {
count += 1;
}
}
assert_eq!(count, file_count, "All {} files should exist", file_count);
for i in (0..file_count).step_by(2) {
provider.delete(&format!("many_{:03}.txt", i)).await?;
}
let mut remaining = 0;
for i in 0..file_count {
if provider.exists(&format!("many_{:03}.txt", i)).await? {
remaining += 1;
}
}
assert_eq!(remaining, file_count / 2, "Half the files should remain");
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
});
}
#[test]
fn test_sequential_reads_same_file() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let mut file = provider
.open("shared_read.txt", OpenOptions::create_write())
.await?;
file.write_all(b"shared content for reading").await?;
file.sync_all().await?;
drop(file);
let mut reader1 = provider
.open("shared_read.txt", OpenOptions::read_only())
.await?;
let mut buf1 = Vec::new();
reader1.read_to_end(&mut buf1).await?;
drop(reader1);
let mut reader2 = provider
.open("shared_read.txt", OpenOptions::read_only())
.await?;
let mut buf2 = Vec::new();
reader2.read_to_end(&mut buf2).await?;
drop(reader2);
assert_eq!(buf1, buf2, "Both reads should see same content");
assert_eq!(&buf1, b"shared content for reading");
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
});
}