use super::commit::{
CommitCompleted, CommitStatus, TriggerCommit, handle_commit_completed, handle_commit_trigger,
};
use super::dirty_tracking::{
auto_despawn_tracking_resource_system, auto_despawn_tracking_system,
};
use super::ecs_plumbing::{
apply_deferred_world_ops, insert_initial_immediate_world_ptr, publish_immediate_world_ptr,
};
use super::listeners::{commit_event_listener, init_commit_listeners};
use super::runtime::{TokioRuntime, ensure_task_pools};
use crate::bevy::params::query::{InFlightQueries, PersistenceQueryCache};
use crate::bevy::world_access::DeferredWorldOperations;
use crate::core::db::connection::DatabaseConnectionResource;
use crate::core::db::DatabaseConnection;
use crate::bevy::registration::COMPONENT_REGISTRY;
use crate::core::session::PersistenceSession;
use bevy::app::PluginGroupBuilder;
use bevy::prelude::*;
use std::any::TypeId;
use std::collections::HashSet;
use std::sync::Arc;
#[derive(SystemSet, Debug, Clone, PartialEq, Eq, Hash)]
pub enum PersistenceSystemSet {
ChangeDetection,
PreCommit,
Commit,
}
#[derive(Resource, Default)]
pub struct RegisteredPersistTypes {
pub types: HashSet<TypeId>,
}
#[derive(Resource, Clone)]
pub struct PersistencePluginConfig {
pub batching_enabled: bool,
pub commit_batch_size: usize,
pub thread_count: usize,
pub default_store: String,
}
impl Default for PersistencePluginConfig {
fn default() -> Self {
Self {
batching_enabled: true,
commit_batch_size: 1000,
thread_count: 4,
default_store: "default_store".to_string(),
}
}
}
#[derive(Clone)]
enum PersistenceBackend {
Static(Arc<dyn DatabaseConnection>),
}
pub struct PersistencePluginCore {
backend: PersistenceBackend,
config: PersistencePluginConfig,
}
impl PersistencePluginCore {
pub fn new(db: Arc<dyn DatabaseConnection>) -> Self {
Self {
backend: PersistenceBackend::Static(db),
config: PersistencePluginConfig::default(),
}
}
pub fn with_config(mut self, config: PersistencePluginConfig) -> Self {
self.config = config;
self
}
}
impl Plugin for PersistencePluginCore {
fn build(&self, app: &mut App) {
ensure_task_pools(app);
let db_conn = match &self.backend {
PersistenceBackend::Static(db) => db.clone(),
};
app.insert_resource(PersistenceSession::new());
app.insert_resource(self.config.clone());
app.insert_resource(DatabaseConnectionResource {
connection: db_conn.clone(),
});
app.init_resource::<RegisteredPersistTypes>();
app.add_message::<TriggerCommit>();
app.add_message::<CommitCompleted>();
app.init_resource::<CommitStatus>();
init_commit_listeners(app.world_mut());
app.insert_resource(TokioRuntime::shared());
app.init_resource::<PersistenceQueryCache>();
app.init_resource::<InFlightQueries>();
app.init_resource::<DeferredWorldOperations>();
insert_initial_immediate_world_ptr(app);
app.add_systems(Startup, publish_immediate_world_ptr);
app.add_systems(First, publish_immediate_world_ptr);
let registry = COMPONENT_REGISTRY.lock().unwrap();
let registrations = registry.len();
if registrations == 0 {
bevy::log::warn!(
"No #[persist] registrations detected; components/resources will not be persisted"
);
} else {
bevy::log::debug!(registrations, "Applying #[persist] registrations");
}
for reg_fn in registry.iter() {
reg_fn(app);
}
app.configure_sets(
PostUpdate,
(
PersistenceSystemSet::ChangeDetection,
PersistenceSystemSet::PreCommit,
PersistenceSystemSet::Commit,
)
.chain(),
);
app.add_systems(
PostUpdate,
(
apply_deferred_world_ops,
publish_immediate_world_ptr,
auto_despawn_tracking_system,
auto_despawn_tracking_resource_system,
)
.in_set(PersistenceSystemSet::ChangeDetection),
);
app.add_systems(
PostUpdate,
(commit_event_listener, handle_commit_trigger).in_set(PersistenceSystemSet::PreCommit),
);
app.add_systems(
PostUpdate,
handle_commit_completed.in_set(PersistenceSystemSet::Commit),
);
}
}
#[derive(Clone)]
pub struct PersistencePlugins {
backend: PersistenceBackend,
config: PersistencePluginConfig,
}
impl PersistencePlugins {
pub fn new(db: Arc<dyn DatabaseConnection>) -> Self {
Self {
backend: PersistenceBackend::Static(db),
config: PersistencePluginConfig::default(),
}
}
pub fn with_config(mut self, config: PersistencePluginConfig) -> Self {
self.config = config;
self
}
}
#[derive(Clone)]
struct PersistenceGuards;
impl Plugin for PersistenceGuards {
fn build(&self, app: &mut App) {
ensure_task_pools(app);
}
}
impl PluginGroup for PersistencePlugins {
fn build(self) -> PluginGroupBuilder {
let core = PersistencePluginCore::new(match self.backend {
PersistenceBackend::Static(db) => db,
})
.with_config(self.config.clone());
PluginGroupBuilder::start::<Self>()
.add(PersistenceGuards)
.add(core)
}
}