use bevy_persistence_database::bevy::components::Guid;
use bevy_persistence_database::bevy::params::query::PersistentQuery;
use bevy_persistence_database::core::db::{
BEVY_PERSISTENCE_DATABASE_METADATA_FIELD, BEVY_PERSISTENCE_DATABASE_VERSION_FIELD,
DocumentKind, PersistenceError, TransactionOperation,
};
use bevy_persistence_database::core::persist::Persist;
use bevy_persistence_database::bevy::query::PersistenceQuery;
use bevy_persistence_database::core::session::commit_sync;
use serde_json::json;
use crate::common::*;
use bevy::prelude::With;
use bevy_persistence_database_derive::db_matrix_test;
#[db_matrix_test]
fn test_update_conflict_is_detected() {
let (db, _container) = setup();
let mut app = setup_test_app(db.clone(), None);
let entity_id = app.world_mut().spawn(Health { value: 100 }).id();
app.update();
commit_sync(&mut app, db.clone(), TEST_STORE).expect("Initial commit failed");
let guid = app.world().get::<Guid>(entity_id).unwrap();
let key = guid.id().to_string();
let (doc, version) = run_async(db.fetch_document(TEST_STORE, &key))
.expect("Failed to fetch document")
.expect("Document should exist");
let mut updated_doc = doc.clone();
if let Some(obj) = updated_doc.as_object_mut() {
obj.insert("Health".to_string(), json!({"value": 150}));
let meta = obj
.entry(BEVY_PERSISTENCE_DATABASE_METADATA_FIELD.to_string())
.or_insert_with(|| json!({}));
if let Some(meta_obj) = meta.as_object_mut() {
meta_obj.insert(
BEVY_PERSISTENCE_DATABASE_VERSION_FIELD.to_string(),
json!(version + 1),
);
}
}
run_async(
db.execute_transaction(vec![TransactionOperation::UpdateDocument {
store: TEST_STORE.to_string(),
kind: DocumentKind::Entity,
key: key.clone(),
expected_current_version: version,
patch: updated_doc,
}]),
)
.expect("Direct DB update failed");
app.world_mut().get_mut::<Health>(entity_id).unwrap().value = 200;
app.update();
let result = commit_sync(&mut app, db.clone(), TEST_STORE);
assert!(matches!(result, Err(PersistenceError::Conflict { .. })));
}
#[db_matrix_test]
fn test_delete_conflict_is_detected() {
let (db, _container) = setup();
let mut app = setup_test_app(db.clone(), None);
let entity_id = app.world_mut().spawn(Health { value: 100 }).id();
app.update();
commit_sync(&mut app, db.clone(), TEST_STORE).expect("Initial commit failed");
let guid = app.world().get::<Guid>(entity_id).unwrap();
let key = guid.id().to_string();
let (doc, version) = run_async(db.fetch_document(TEST_STORE, &key))
.expect("Failed to fetch document")
.expect("Document should exist");
let mut updated_doc = doc.clone();
if let Some(obj) = updated_doc.as_object_mut() {
let meta = obj
.entry(BEVY_PERSISTENCE_DATABASE_METADATA_FIELD.to_string())
.or_insert_with(|| json!({}));
if let Some(meta_obj) = meta.as_object_mut() {
meta_obj.insert(
BEVY_PERSISTENCE_DATABASE_VERSION_FIELD.to_string(),
json!(version + 1),
);
}
}
run_async(
db.execute_transaction(vec![TransactionOperation::UpdateDocument {
store: TEST_STORE.to_string(),
kind: DocumentKind::Entity,
key: key.clone(),
expected_current_version: version,
patch: updated_doc,
}]),
)
.expect("Direct version update failed");
app.world_mut().entity_mut(entity_id).despawn();
app.update();
let result = commit_sync(&mut app, db.clone(), TEST_STORE);
assert!(matches!(result, Err(PersistenceError::Conflict { .. })));
}
#[db_matrix_test]
fn test_conflict_strategy_last_write_wins() {
let (db, _container) = setup();
let mut app = setup_test_app(db.clone(), None);
let entity_id = app
.world_mut()
.spawn((Health { value: 100 }, Position { x: 0.0, y: 0.0 }))
.id();
app.update();
commit_sync(&mut app, db.clone(), TEST_STORE).expect("Initial commit failed");
let guid = app.world().get::<Guid>(entity_id).unwrap();
let key = guid.id().to_string();
let (doc, version) = run_async(db.fetch_document(TEST_STORE, &key))
.expect("Failed to fetch document")
.expect("Document should exist");
let mut updated_doc = doc.clone();
if let Some(obj) = updated_doc.as_object_mut() {
obj.insert("Health".to_string(), json!({"value": 150}));
let meta = obj
.entry(BEVY_PERSISTENCE_DATABASE_METADATA_FIELD.to_string())
.or_insert_with(|| json!({}));
if let Some(meta_obj) = meta.as_object_mut() {
meta_obj.insert(
BEVY_PERSISTENCE_DATABASE_VERSION_FIELD.to_string(),
json!(version + 1),
);
}
}
run_async(
db.execute_transaction(vec![TransactionOperation::UpdateDocument {
store: TEST_STORE.to_string(),
kind: DocumentKind::Entity,
key: key.clone(),
expected_current_version: version,
patch: updated_doc,
}]),
)
.expect("Direct DB update failed");
app.world_mut().get_mut::<Position>(entity_id).unwrap().x = 50.0;
app.update();
let result = commit_sync(&mut app, db.clone(), TEST_STORE);
assert!(matches!(result, Err(PersistenceError::Conflict { .. })));
#[derive(bevy::prelude::Resource)]
struct KeyRes(String);
fn reload_by_key(
pq: PersistentQuery<(&Health, &Position), (With<Health>, With<Position>)>,
key: bevy::prelude::Res<KeyRes>,
) {
let _ = pq.filter(Guid::key_field().eq(&key.0)).load();
}
app.insert_resource(KeyRes(key.clone()));
app.add_systems(bevy::prelude::Update, reload_by_key);
app.update();
let reloaded_entity = {
let mut q = app.world_mut().query::<(bevy::prelude::Entity, &Guid)>();
q.iter(&app.world())
.find(|(_, g)| g.id() == key)
.map(|(e, _)| e)
.expect("reloaded entity not found")
};
assert_eq!(
app.world().get::<Health>(reloaded_entity).unwrap().value,
150
);
app.world_mut()
.get_mut::<Position>(reloaded_entity)
.unwrap()
.x = 50.0;
app.update();
commit_sync(&mut app, db.clone(), TEST_STORE).expect("Second commit failed");
let (final_doc, _) = run_async(db.fetch_document(TEST_STORE, &key))
.expect("Failed to fetch final document")
.expect("Document should exist");
let health_value = final_doc
.get("Health")
.and_then(|h| h.get("value"))
.and_then(|v| v.as_i64())
.expect("Health value not found");
assert_eq!(health_value, 150);
let position_x = final_doc
.get("Position")
.and_then(|p| p.get("x"))
.and_then(|v| v.as_f64())
.expect("Position x not found");
assert_eq!(position_x, 50.0);
}
#[db_matrix_test]
fn test_conflict_strategy_three_way_merge() {
let (db, _container) = setup();
let mut app = setup_test_app(db.clone(), None);
let base_health = Health { value: 100 };
let base_position = Position { x: 0.0, y: 0.0 };
let entity_id = app
.world_mut()
.spawn((base_health.clone(), base_position.clone()))
.id();
app.update();
commit_sync(&mut app, db.clone(), TEST_STORE).expect("Initial commit failed");
let guid = app.world().get::<Guid>(entity_id).unwrap();
let key = guid.id().to_string();
let (doc, version) = run_async(db.fetch_document(TEST_STORE, &key))
.unwrap()
.unwrap();
let mut updated_doc = doc.clone();
if let Some(obj) = updated_doc.as_object_mut() {
obj.insert("Health".to_string(), json!({"value": 150}));
let meta = obj
.entry(BEVY_PERSISTENCE_DATABASE_METADATA_FIELD.to_string())
.or_insert_with(|| json!({}));
if let Some(meta_obj) = meta.as_object_mut() {
meta_obj.insert(
BEVY_PERSISTENCE_DATABASE_VERSION_FIELD.to_string(),
json!(version + 1),
);
}
}
run_async(
db.execute_transaction(vec![TransactionOperation::UpdateDocument {
store: TEST_STORE.to_string(),
kind: DocumentKind::Entity,
key: key.clone(),
expected_current_version: version,
patch: updated_doc,
}]),
)
.expect("Direct DB update for Health failed");
let my_position_change = Position { x: 50.0, y: 50.0 };
app.world_mut().get_mut::<Position>(entity_id).unwrap().x = my_position_change.x;
app.world_mut().get_mut::<Position>(entity_id).unwrap().y = my_position_change.y;
app.update();
let result = commit_sync(&mut app, db.clone(), TEST_STORE);
assert!(matches!(result, Err(PersistenceError::Conflict { .. })));
let loaded = run_async(
PersistenceQuery::new(db.clone(), TEST_STORE)
.with::<Health>()
.with::<Position>()
.filter(Guid::key_field().eq(&key))
.fetch_into(app.world_mut()),
);
assert_eq!(loaded.len(), 1);
let reloaded_entity = loaded[0];
let their_health = app.world().get::<Health>(reloaded_entity).unwrap();
assert_eq!(their_health.value, 150);
app.world_mut()
.get_mut::<Position>(reloaded_entity)
.unwrap()
.x = my_position_change.x;
app.world_mut()
.get_mut::<Position>(reloaded_entity)
.unwrap()
.y = my_position_change.y;
app.update();
commit_sync(&mut app, db.clone(), TEST_STORE).expect("Merged commit failed");
let (final_doc, _) = run_async(db.fetch_document(TEST_STORE, &key))
.unwrap()
.unwrap();
let final_health: Health = serde_json::from_value(final_doc[Health::name()].clone()).unwrap();
let final_position: Position =
serde_json::from_value(final_doc[Position::name()].clone()).unwrap();
assert_eq!(final_health.value, 150, "Health change was not merged");
assert_eq!(final_position.x, 50.0, "Position change was not merged");
assert_eq!(final_position.y, 50.0, "Position change was not merged");
}