use bevy::ecs::query::{QueryData, QueryFilter};
use bevy::ecs::system::SystemParam;
use bevy::prelude::{Entity, Query, Res, World};
use crate::bevy::params::query::PersistenceQueryCache;
use crate::bevy::plugins::persistence_plugin::{PersistencePluginConfig, TokioRuntime};
use crate::bevy::world_access::ImmediateWorldPtr;
use crate::core::db::connection::DatabaseConnectionResource;
use crate::core::query::EdgeQuerySpecification;
use crate::core::session::PersistenceSession;
use std::any::TypeId;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
#[derive(SystemParam)]
pub struct PersistenceRelationshipHydrator<'w, 's> {
pub(crate) db: Res<'w, DatabaseConnectionResource>,
pub(crate) cache: Res<'w, PersistenceQueryCache>,
pub(crate) runtime: Res<'w, TokioRuntime>,
pub(crate) world_ptr: Option<Res<'w, ImmediateWorldPtr>>,
pub(crate) config: Res<'w, PersistencePluginConfig>,
pub(crate) _marker: PhantomData<&'s ()>,
}
impl<'w, 's> PersistenceRelationshipHydrator<'w, 's> {
pub fn load<R: Send + Sync + 'static>(&mut self, entity: Entity, depth: usize) {
self.load_many::<R>(&[entity], depth);
}
pub fn load_all(&mut self, entity: Entity, depth: usize) {
self.load_all_many(&[entity], depth);
}
pub fn load_many<R: Send + Sync + 'static>(&mut self, entities: &[Entity], depth: usize) {
if depth == 0 || entities.is_empty() {
return;
}
let Some(ptr) = &self.world_ptr else {
return;
};
let world = ptr.as_world_mut();
world.resource_scope(|world, mut session: bevy::prelude::Mut<PersistenceSession>| {
let Some(rel_name) = session.relationship_type_name(&TypeId::of::<R>()) else {
return;
};
let spec = self.build_spec(
world,
&mut session,
vec![TypeId::of::<R>()],
entities,
depth,
rel_name,
);
if let Some(spec) = spec {
self.run_spec(world, &mut session, spec, entities, depth);
}
});
world.flush();
}
pub fn load_all_many(&mut self, entities: &[Entity], depth: usize) {
if depth == 0 || entities.is_empty() {
return;
}
let Some(ptr) = &self.world_ptr else {
return;
};
let world = ptr.as_world_mut();
world.resource_scope(|world, mut session: bevy::prelude::Mut<PersistenceSession>| {
let all_types: Vec<TypeId> = session
.relationship_type_entries()
.into_iter()
.map(|(type_id, _)| type_id)
.collect();
for type_id in all_types {
let Some(rel_name) = session.relationship_type_name(&type_id) else {
continue;
};
let spec = self.build_spec(world, &mut session, vec![type_id], entities, depth, rel_name);
if let Some(spec) = spec {
self.run_spec(world, &mut session, spec, entities, depth);
}
}
});
world.flush();
}
pub fn load_for_typed<R: Send + Sync + 'static, Q: QueryData, F: QueryFilter>(
&mut self,
query: &Query<(Entity, Q), F>,
depth: usize,
) {
let entities: Vec<Entity> = query.iter().map(|(entity, _)| entity).collect();
self.load_many::<R>(&entities, depth);
}
pub fn load_for<Q: QueryData, F: QueryFilter>(
&mut self,
query: &Query<(Entity, Q), F>,
depth: usize,
) {
let entities: Vec<Entity> = query.iter().map(|(entity, _)| entity).collect();
self.load_all_many(&entities, depth);
}
#[cfg(feature = "bevy_many_relationship_edges")]
pub fn are_relationships_loaded<R: Send + Sync + 'static>(&self, entity: Entity) -> bool {
let Some(ptr) = &self.world_ptr else {
return false;
};
let world: &World = ptr.as_world();
world
.get::<bevy_many_relationships::OutgoingRelationships<R>>(entity)
.is_some()
}
#[cfg(not(feature = "bevy_many_relationship_edges"))]
pub fn are_relationships_loaded<R: bevy::prelude::Component>(&self, entity: Entity) -> bool {
let Some(ptr) = &self.world_ptr else {
return false;
};
let world: &World = ptr.as_world();
world.get::<R>(entity).is_some()
}
fn build_spec(
&self,
_world: &mut World,
session: &mut PersistenceSession,
relationship_type_ids: Vec<TypeId>,
entities: &[Entity],
depth: usize,
rel_name: &str,
) -> Option<EdgeQuerySpecification> {
let mut source_keys = Vec::new();
for entity in entities {
if let Some(key) = session.entity_key(*entity) {
source_keys.push(key.clone());
}
}
if source_keys.is_empty() {
return None;
}
let mut hasher = std::collections::hash_map::DefaultHasher::new();
for type_id in &relationship_type_ids {
type_id.hash(&mut hasher);
}
for key in &source_keys {
key.hash(&mut hasher);
}
depth.hash(&mut hasher);
let hash = hasher.finish();
if self.cache.contains(hash) {
return None;
}
self.cache.insert(hash);
Some(EdgeQuerySpecification {
store: self.config.default_store.clone(),
relationship_types: vec![rel_name.to_string()],
from_guids: source_keys,
to_guids: Vec::new(),
depth,
})
}
fn run_spec(
&self,
world: &mut World,
session: &mut PersistenceSession,
spec: EdgeQuerySpecification,
entities: &[Entity],
_depth: usize,
) {
let edges = match self.runtime.block_on(self.db.connection.query_edges(&spec)) {
Ok(edges) => edges,
Err(_) => return,
};
let Some(type_name) = spec.relationship_types.first() else {
return;
};
let Some(type_id) = session
.relationship_type_entries()
.into_iter()
.find_map(|(type_id, name)| if name == type_name { Some(type_id) } else { None })
else {
return;
};
let mut grouped: HashMap<String, Vec<(String, Option<serde_json::Value>)>> = HashMap::new();
for edge in edges {
grouped
.entry(edge.from_guid)
.or_default()
.push((edge.to_guid, edge.payload));
}
for entity in entities {
let Some(source_key) = session.entity_key(*entity).cloned() else {
continue;
};
let raw_targets = grouped.remove(&source_key).unwrap_or_default();
let mut resolved_targets = Vec::new();
for (target_key, payload) in raw_targets {
let target_entity = if let Some(existing) = session.entity_by_key(&target_key) {
if world.get_entity(existing).is_ok() {
Some(existing)
} else {
None
}
} else {
match self
.runtime
.block_on(self.db.connection.fetch_document(&spec.store, &target_key))
{
Ok(Some((doc, _))) => {
self.apply_fetched_document(world, session, &target_key, doc)
}
_ => None,
}
};
if let Some(target_entity) = target_entity {
resolved_targets.push((target_entity, payload));
}
}
let _ = session.apply_relationship_targets(type_id, world, *entity, resolved_targets);
}
}
fn apply_fetched_document(
&self,
world: &mut World,
session: &mut PersistenceSession,
key: &str,
doc: serde_json::Value,
) -> Option<Entity> {
let key_field = self.db.connection.document_key_field();
let entity = world
.query::<(Entity, &crate::bevy::components::Guid)>()
.iter(world)
.find_map(|(entity, guid)| if guid.id() == key { Some(entity) } else { None })
.or_else(|| session.entity_by_key(key))
.unwrap_or_else(|| world.spawn(crate::bevy::components::Guid::new(key.to_string())).id());
session.insert_entity_key(entity, key.to_string());
let mut materialized = doc;
if materialized.get(key_field).is_none() {
if let Some(map) = materialized.as_object_mut() {
map.insert(key_field.to_string(), serde_json::Value::String(key.to_string()));
}
}
for (name, deser) in session.component_deserializers() {
if let Some(value) = materialized.get(name) {
if deser(world, entity, value.clone()).is_err() {
return None;
}
}
}
Some(entity)
}
}