use crate::common::{Health, TEST_STORE, make_app, run_async, setup_test_app};
use bevy::prelude::With;
use bevy::prelude::*;
use bevy_persistence_database::bevy::components::Guid;
use bevy_persistence_database::bevy::params::query::PersistentQuery;
use bevy_persistence_database::bevy::plugins::persistence_plugin::{
CommitStatus, PersistencePluginConfig,
};
use bevy_persistence_database::core::db::{
BEVY_PERSISTENCE_DATABASE_BEVY_TYPE_FIELD, BEVY_PERSISTENCE_DATABASE_METADATA_FIELD,
BEVY_PERSISTENCE_DATABASE_VERSION_FIELD, DocumentKind, MockDatabaseConnection,
PersistenceError, TransactionOperation,
};
use bevy_persistence_database::core::session::commit_sync;
use bevy_persistence_database_derive::db_matrix_test;
use std::sync::Arc;
use std::time::{Duration, Instant};
#[db_matrix_test]
fn test_successful_batch_commit_of_new_entities() {
let (db, _c) = setup();
let mut app = make_app(db.clone(), 2);
for i in 0..10 {
app.world_mut().spawn(Health { value: i });
}
app.update();
let res = commit_sync(&mut app, db.clone(), TEST_STORE);
assert!(res.is_ok());
let world_ref = app.world_mut();
let count = world_ref.query::<&Guid>().iter(world_ref).count();
assert_eq!(count, 10);
let mut app2 = setup_test_app(db.clone(), None);
fn load(mut pq: PersistentQuery<&Health, With<Health>>) {
let _ = pq.load();
}
app2.add_systems(bevy::prelude::Update, load);
app2.update();
let loaded = app2
.world_mut()
.query::<&Health>()
.iter(&app2.world())
.count();
assert_eq!(loaded, 10);
}
#[db_matrix_test]
fn test_batch_commit_with_updates_and_deletes() {
let (db, _c) = setup();
let mut app = make_app(db.clone(), 2);
let ids: Vec<_> = (0..5)
.map(|i| app.world_mut().spawn(Health { value: i }).id())
.collect();
app.update();
commit_sync(&mut app, db.clone(), TEST_STORE).unwrap();
app.world_mut().get_mut::<Health>(ids[0]).unwrap().value = 100;
app.world_mut().get_mut::<Health>(ids[1]).unwrap().value = 101;
app.world_mut().entity_mut(ids[3]).despawn();
app.world_mut().entity_mut(ids[4]).despawn();
app.update();
let res = commit_sync(&mut app, db.clone(), TEST_STORE);
assert!(res.is_ok());
assert_eq!(*app.world().resource::<CommitStatus>(), CommitStatus::Idle);
let mut app2 = setup_test_app(db.clone(), None);
fn load(mut pq: PersistentQuery<&Health, With<Health>>) {
let _ = pq.load();
}
app2.add_systems(bevy::prelude::Update, load);
app2.update();
let vals: Vec<_> = app2
.world_mut()
.query::<&Health>()
.iter(&app2.world())
.map(|h| h.value)
.collect();
assert_eq!(vals.len(), 3);
assert!(vals.contains(&100) && vals.contains(&101) && vals.contains(&2));
}
#[test]
fn test_batch_commit_failure_propagates() {
let (db, _c) = crate::common::setup_sync();
let mut app = make_app(db.clone(), 2);
let ids: Vec<_> = (0..5)
.map(|i| app.world_mut().spawn(Health { value: i }).id())
.collect();
app.update();
commit_sync(&mut app, db.clone(), TEST_STORE).unwrap();
let guid = app.world().get::<Guid>(ids[2]).unwrap().id().to_string();
let (_doc, ver) = run_async(db.fetch_document(TEST_STORE, &guid))
.unwrap()
.unwrap();
let bad = serde_json::json!({
"_key": guid,
BEVY_PERSISTENCE_DATABASE_METADATA_FIELD: {
BEVY_PERSISTENCE_DATABASE_VERSION_FIELD: ver + 1,
BEVY_PERSISTENCE_DATABASE_BEVY_TYPE_FIELD: "entity"
}
});
run_async(
db.execute_transaction(vec![TransactionOperation::UpdateDocument {
store: TEST_STORE.to_string(),
kind: DocumentKind::Entity,
key: guid.clone(),
expected_current_version: ver,
patch: bad.clone(),
}]),
)
.unwrap();
for id in &ids {
app.world_mut().get_mut::<Health>(*id).unwrap().value += 10;
}
app.update();
let res = commit_sync(&mut app, db.clone(), TEST_STORE);
assert!(matches!(res, Err(PersistenceError::Conflict{ key }) if key == guid));
assert_eq!(*app.world().resource::<CommitStatus>(), CommitStatus::Idle);
}
#[test]
fn test_concurrent_batch_execution() {
let mut db = MockDatabaseConnection::new();
db.expect_document_key_field().return_const("_key");
let batch_count = 5;
let batch_delay = Duration::from_millis(50);
db.expect_execute_transaction()
.times(batch_count)
.returning(move |_ops| {
Box::pin(async move {
tokio::time::sleep(batch_delay).await;
Ok(vec![])
})
});
let db_arc = Arc::new(db);
let batch_size = 2;
let entity_count = batch_count * batch_size;
let config = PersistencePluginConfig {
batching_enabled: true,
commit_batch_size: batch_size,
thread_count: 4,
default_store: TEST_STORE.to_string(),
};
let mut app = setup_test_app(db_arc.clone(), Some(config.clone()));
for i in 0..entity_count {
app.world_mut().spawn(Health { value: i as i32 });
}
app.update();
let start_time = Instant::now();
let res = commit_sync(&mut app, db_arc.clone(), TEST_STORE);
let elapsed = start_time.elapsed();
assert!(res.is_ok());
let total_sequential_delay = batch_delay * batch_count as u32;
println!("Elapsed time for concurrent commit: {:?}", elapsed);
println!(
"Total sequential delay would be: {:?}",
total_sequential_delay
);
assert!(
elapsed < total_sequential_delay,
"Concurrent execution was not faster than sequential."
);
assert!(
elapsed > batch_delay,
"Elapsed time should be at least one batch delay."
);
assert!(
elapsed < batch_delay * 2,
"Concurrent execution took too long."
);
}
#[test]
fn test_atomic_multi_batch_commit() {
let mut db = MockDatabaseConnection::new();
db.expect_document_key_field().return_const("_key");
let batch_count = 3;
let batch_to_fail = 2;
let mut call_count = 0;
db.expect_execute_transaction()
.times(batch_count)
.returning(move |_| {
let current_batch = call_count;
call_count += 1;
Box::pin(async move {
tokio::time::sleep(Duration::from_millis(20)).await;
if current_batch == batch_to_fail {
return Err(PersistenceError::new("Simulated failure in batch"));
}
Ok(vec![])
})
});
let db_arc = Arc::new(db);
let config = PersistencePluginConfig {
batching_enabled: true,
commit_batch_size: 3,
thread_count: 2,
default_store: TEST_STORE.to_string(),
};
let mut app = setup_test_app(db_arc.clone(), Some(config));
let entity_count = batch_count * 3; for i in 0..entity_count {
app.world_mut().spawn(Health { value: i as i32 });
}
app.update();
let result = commit_sync(&mut app, db_arc.clone(), TEST_STORE);
assert!(result.is_err());
assert!(
matches!(result, Err(PersistenceError::General(msg)) if msg.contains("Simulated failure"))
);
assert_eq!(*app.world().resource::<CommitStatus>(), CommitStatus::Idle);
let world_ref = app.world_mut();
let guid_count = world_ref.query::<&Guid>().iter(world_ref).count();
assert_eq!(
guid_count, 0,
"No entity should have a Guid after atomic failure"
);
}
#[test]
fn test_batches_respect_config_max_ops() {
use bevy_persistence_database::bevy::plugins::persistence_plugin::PersistencePluginConfig;
use bevy_persistence_database::core::db::MockDatabaseConnection;
use bevy_persistence_database::core::session::commit_sync;
use std::sync::Arc;
let batch_size = 7usize;
let entity_count = 25usize;
let expected_batches = (entity_count + batch_size - 1) / batch_size;
let mut db = MockDatabaseConnection::new();
db.expect_document_key_field().return_const("_key");
db.expect_execute_transaction()
.times(expected_batches)
.returning(move |ops| {
assert!(
ops.len() <= batch_size,
"Batch too large: got {}, limit {}",
ops.len(),
batch_size
);
Box::pin(async { Ok(vec![]) })
});
let config = PersistencePluginConfig {
batching_enabled: true,
commit_batch_size: batch_size,
thread_count: 4,
default_store: TEST_STORE.to_string(),
};
let conn = Arc::new(db);
let mut app = setup_test_app(conn.clone(), Some(config));
for i in 0..entity_count {
app.world_mut().spawn(Health { value: i as i32 });
}
app.update();
let res = commit_sync(&mut app, conn, TEST_STORE);
assert!(res.is_ok(), "Commit should succeed with mocked DB");
}