use moonpool_core::{OpenOptions, StorageFile, StorageProvider};
use moonpool_sim::{SimWorld, StorageConfiguration};
use std::net::IpAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
const TEST_IP_STR: &str = "127.0.0.1";
fn test_ip() -> IpAddr {
TEST_IP_STR.parse().expect("valid IP")
}
fn local_runtime() -> tokio::runtime::LocalRuntime {
tokio::runtime::Builder::new_current_thread()
.enable_io()
.enable_time()
.build_local(Default::default())
.expect("Failed to build local runtime")
}
fn fast_sim() -> SimWorld {
let mut sim = SimWorld::new();
sim.set_storage_config(StorageConfiguration::fast_local());
sim
}
#[test]
fn test_simulate_crash_api_basic() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let file = provider
.open("test.txt", OpenOptions::create_write())
.await?;
drop(file);
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
sim.simulate_crash_for_process(test_ip(), true);
sim.simulate_crash_for_process(test_ip(), false);
sim.simulate_crash_for_process(test_ip(), true);
});
}
#[test]
fn test_simulate_crash_synced_data_survives() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let data = b"This data is synced and should survive crash!";
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let mut file = provider
.open("synced.txt", OpenOptions::create_write())
.await?;
file.write_all(data).await?;
file.sync_all().await?;
drop(file);
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
sim.simulate_crash_for_process(test_ip(), true);
let provider2 = sim.storage_provider(test_ip());
let handle2 = tokio::task::spawn_local(async move {
let mut file = provider2
.open("synced.txt", OpenOptions::read_only())
.await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
Ok::<_, std::io::Error>(buf)
});
while !handle2.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
let read_data = handle2.await.expect("task panicked").expect("io error");
assert_eq!(&read_data, data, "Synced data should survive crash intact");
});
}
#[test]
fn test_simulate_crash_unsynced_data_behavior() {
local_runtime().block_on(async {
let mut config = StorageConfiguration::fast_local();
config.crash_fault_probability = 1.0;
let mut sim = SimWorld::new();
sim.set_storage_config(config);
let original_data = b"This data is NOT synced";
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let mut file = provider
.open("unsynced.txt", OpenOptions::create_write())
.await?;
file.write_all(original_data).await?;
drop(file);
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
sim.simulate_crash_for_process(test_ip(), true);
let provider2 = sim.storage_provider(test_ip());
let handle2 = tokio::task::spawn_local(async move {
let exists = provider2.exists("unsynced.txt").await?;
if !exists {
return Ok::<_, std::io::Error>(None);
}
let mut file = provider2
.open("unsynced.txt", OpenOptions::read_only())
.await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
Ok(Some(buf))
});
while !handle2.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
let read_result = handle2.await.expect("task panicked").expect("io error");
match read_result {
None => {
println!("File doesn't exist after crash (expected with high crash probability)")
}
Some(data) if data.is_empty() => println!("File is empty after crash"),
Some(data) if data != original_data => {
println!("Data corrupted after crash (expected)")
}
Some(data) => println!(
"Data survived crash (can happen if pending writes were already flushed): {:?}",
String::from_utf8_lossy(&data)
),
}
});
}
#[test]
fn test_simulate_crash_close_files_true() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let file = provider
.open("close_test.txt", OpenOptions::create_write())
.await?;
drop(file);
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
sim.simulate_crash_for_process(test_ip(), true);
let provider2 = sim.storage_provider(test_ip());
let handle2 = tokio::task::spawn_local(async move {
let exists = provider2.exists("close_test.txt").await?;
Ok::<_, std::io::Error>(exists)
});
while !handle2.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
let exists = handle2.await.expect("task panicked").expect("io error");
println!("File exists after crash (close_files=true): {}", exists);
});
}
#[test]
fn test_simulate_crash_close_files_false() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let file = provider
.open("no_close_test.txt", OpenOptions::create_write())
.await?;
drop(file);
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
sim.simulate_crash_for_process(test_ip(), false);
let provider2 = sim.storage_provider(test_ip());
let handle2 = tokio::task::spawn_local(async move {
let exists = provider2.exists("no_close_test.txt").await?;
Ok::<_, std::io::Error>(exists)
});
while !handle2.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
let exists = handle2.await.expect("task panicked").expect("io error");
println!("File exists after crash (close_files=false): {}", exists);
});
}
#[test]
fn test_simulate_crash_multiple_files() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
for i in 0..5 {
let mut file = provider
.open(&format!("multi_{}.txt", i), OpenOptions::create_write())
.await?;
file.write_all(format!("File {} content", i).as_bytes())
.await?;
file.sync_all().await?;
drop(file);
}
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
sim.simulate_crash_for_process(test_ip(), true);
let provider2 = sim.storage_provider(test_ip());
let handle2 = tokio::task::spawn_local(async move {
let mut count = 0;
for i in 0..5 {
if provider2.exists(&format!("multi_{}.txt", i)).await? {
count += 1;
}
}
Ok::<_, std::io::Error>(count)
});
while !handle2.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
let count = handle2.await.expect("task panicked").expect("io error");
assert_eq!(count, 5, "All synced files should survive crash");
});
}
#[test]
fn test_simulate_crash_during_write() {
local_runtime().block_on(async {
let mut config = StorageConfiguration::fast_local();
config.crash_fault_probability = 1.0;
let mut sim = SimWorld::new();
sim.set_storage_config(config);
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let mut file = provider
.open("mid_write.txt", OpenOptions::create_write())
.await?;
file.write_all(b"First part").await?;
drop(file);
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
sim.simulate_crash_for_process(test_ip(), true);
let provider2 = sim.storage_provider(test_ip());
let handle2 = tokio::task::spawn_local(async move {
let exists = provider2.exists("mid_write.txt").await?;
Ok::<_, std::io::Error>(exists)
});
while !handle2.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
let exists = handle2.await.expect("task panicked").expect("io error");
println!("File exists after mid-write crash: {}", exists);
});
}
#[test]
fn test_simulate_crash_zero_corruption_probability() {
local_runtime().block_on(async {
let mut config = StorageConfiguration::fast_local();
config.crash_fault_probability = 0.0;
let mut sim = SimWorld::new();
sim.set_storage_config(config);
let data = b"Data with zero crash probability";
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let mut file = provider
.open("zero_crash.txt", OpenOptions::create_write())
.await?;
file.write_all(data).await?;
file.sync_all().await?;
drop(file);
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
sim.simulate_crash_for_process(test_ip(), true);
let provider2 = sim.storage_provider(test_ip());
let handle2 = tokio::task::spawn_local(async move {
let mut file = provider2
.open("zero_crash.txt", OpenOptions::read_only())
.await?;
let mut buf = Vec::new();
file.read_to_end(&mut buf).await?;
Ok::<_, std::io::Error>(buf)
});
while !handle2.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
let read_data = handle2.await.expect("task panicked").expect("io error");
assert_eq!(
&read_data, data,
"Data should be intact with 0% crash probability"
);
});
}
#[test]
fn test_simulate_crash_repeated() {
local_runtime().block_on(async {
let mut sim = fast_sim();
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let mut file = provider
.open("repeated.txt", OpenOptions::create_write())
.await?;
file.write_all(b"test data").await?;
file.sync_all().await?;
drop(file);
Ok::<_, std::io::Error>(())
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
handle.await.expect("task panicked").expect("io error");
for i in 0..10 {
sim.simulate_crash_for_process(test_ip(), i % 2 == 0); }
let provider2 = sim.storage_provider(test_ip());
let handle2 =
tokio::task::spawn_local(async move { provider2.exists("repeated.txt").await });
while !handle2.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
let exists = handle2.await.expect("task panicked").expect("io error");
println!("File exists after repeated crashes: {}", exists);
});
}
#[test]
fn test_simulate_crash_no_files() {
local_runtime().block_on(async {
let mut sim = fast_sim();
sim.simulate_crash_for_process(test_ip(), true);
sim.simulate_crash_for_process(test_ip(), false);
let provider = sim.storage_provider(test_ip());
let handle = tokio::task::spawn_local(async move {
let file = provider
.open("after_crash.txt", OpenOptions::create_write())
.await?;
drop(file);
provider.exists("after_crash.txt").await
});
while !handle.is_finished() {
while sim.pending_event_count() > 0 {
sim.step();
}
tokio::task::yield_now().await;
}
let exists = handle.await.expect("task panicked").expect("io error");
assert!(exists, "Should be able to create files after crash");
});
}