use fluxmap::db::{Database, IsolationLevel};
use fluxmap::error::FluxError;
use std::sync::Arc;
#[tokio::test]
async fn test_write_skew_under_snapshot_isolation() {
let db: Arc<Database<String, i64>> = Arc::new(
Database::builder()
.isolation_level(IsolationLevel::Snapshot)
.build()
.await
.unwrap(),
);
let handle = db.handle();
handle.insert("x".to_string(), 100).await.unwrap();
handle.insert("y".to_string(), 100).await.unwrap();
drop(handle);
let db1 = db.clone();
let t1 = tokio::spawn(async move {
let mut handle = db1.handle();
handle.transaction(|h| Box::pin(async move {
let x = h.get(&"x".to_string()).unwrap().unwrap();
let y = h.get(&"y".to_string()).unwrap().unwrap();
if (*x + *y) >= 150 {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
h.insert("x".to_string(), *x - 150).await?; Ok(())
} else {
panic!("Insufficient funds, test logic error");
}
})).await
});
let db2 = db.clone();
let t2 = tokio::spawn(async move {
let mut handle = db2.handle();
handle.transaction(|h| Box::pin(async move {
let x = h.get(&"x".to_string()).unwrap().unwrap();
let y = h.get(&"y".to_string()).unwrap().unwrap();
if (*x + *y) >= 150 {
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
h.insert("y".to_string(), *y - 150).await?; Ok(())
} else {
panic!("Insufficient funds, test logic error");
}
})).await
});
let res1: Result<(), FluxError> = t1.await.unwrap();
let res2: Result<(), FluxError> = t2.await.unwrap();
assert!(res1.is_ok(), "T1 should commit successfully under SI");
assert!(res2.is_ok(), "T2 should commit successfully under SI");
let final_handle = db.handle();
let final_x = final_handle.get(&"x".to_string()).unwrap().unwrap();
let final_y = final_handle.get(&"y".to_string()).unwrap().unwrap();
assert_eq!(*final_x, -50);
assert_eq!(*final_y, -50);
assert!(
(*final_x + *final_y) < 0,
"Write skew occurred: final balance is negative!"
);
}
#[tokio::test]
async fn test_write_skew_is_prevented_by_serializable() {
let db: Arc<Database<String, i64>> = Arc::new(
Database::builder()
.isolation_level(IsolationLevel::Serializable) .build()
.await
.unwrap(),
);
let handle = db.handle();
handle.insert("x".to_string(), 100).await.unwrap();
handle.insert("y".to_string(), 100).await.unwrap();
drop(handle);
let db1 = db.clone();
let t1 = tokio::spawn(async move {
let mut handle = db1.handle();
handle.transaction(|h| Box::pin(async move {
let x = h.get(&"x".to_string()).unwrap().unwrap();
let y = h.get(&"y".to_string()).unwrap().unwrap();
if (*x + *y) >= 150 {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
h.insert("x".to_string(), *x - 150).await?;
Ok(())
} else {
panic!("Insufficient funds, test logic error");
}
})).await
});
let db2 = db.clone();
let t2 = tokio::spawn(async move {
let mut handle = db2.handle();
handle.transaction(|h| Box::pin(async move {
let x = h.get(&"x".to_string()).unwrap().unwrap();
let y = h.get(&"y".to_string()).unwrap().unwrap();
if (*x + *y) >= 150 {
tokio::time::sleep(std::time::Duration::from_millis(25)).await;
h.insert("y".to_string(), *y - 150).await?;
Ok(())
} else {
panic!("Insufficient funds, test logic error");
}
})).await
});
let res1: Result<(), FluxError> = t1.await.unwrap();
let res2: Result<(), FluxError> = t2.await.unwrap();
let success1 = res1.is_ok();
let success2 = res2.is_ok();
assert_ne!(success1, success2, "Exactly one transaction should succeed and one should fail. res1: {:?}, res2: {:?}", res1, res2);
if success1 {
assert_eq!(res2.unwrap_err(), FluxError::SerializationConflict);
} else {
assert_eq!(res1.unwrap_err(), FluxError::SerializationConflict);
}
let final_handle = db.handle();
let final_x = final_handle.get(&"x".to_string()).unwrap().unwrap();
let final_y = final_handle.get(&"y".to_string()).unwrap().unwrap();
assert!(
(*final_x + *final_y) >= 0,
"Invariant maintained: final balance is non-negative"
);
if success1 { assert_eq!(*final_x, -50);
assert_eq!(*final_y, 100);
} else { assert_eq!(*final_x, 100);
assert_eq!(*final_y, -50);
}
}