use crate::bevy::plugins::persistence_plugin::{TokioRuntime, TriggerCommit, register_commit_listener};
use crate::core::db::connection::{DatabaseConnection, PersistenceError};
use bevy::prelude::{App, info};
use std::sync::{Arc, atomic::{AtomicU64, Ordering}};
use tokio::sync::oneshot;
use tokio::time::timeout;
pub(super) static NEXT_CORRELATION_ID: AtomicU64 = AtomicU64::new(1);
pub async fn commit(
app: &mut App,
connection: Arc<dyn DatabaseConnection>,
store: impl Into<String>,
) -> Result<(), PersistenceError> {
let store = store.into();
if store.is_empty() {
return Err(PersistenceError::new("commit store must be provided"));
}
let correlation_id = NEXT_CORRELATION_ID.fetch_add(1, Ordering::Relaxed);
let (tx, mut rx) = oneshot::channel();
register_commit_listener(app.world_mut(), correlation_id, tx);
app.world_mut().write_message(TriggerCommit {
correlation_id: Some(correlation_id),
target_connection: connection,
store: store.clone(),
});
timeout(std::time::Duration::from_secs(60), async {
loop {
app.update();
match rx.try_recv() {
Ok(result) => {
info!("Received commit result for correlation ID {}", correlation_id);
return result;
}
Err(oneshot::error::TryRecvError::Empty) => {
tokio::task::yield_now().await;
}
Err(oneshot::error::TryRecvError::Closed) => {
return Err(PersistenceError::new(
"Commit channel closed unexpectedly. The commit listener might have panicked.",
));
}
}
}
})
.await
.map_err(|_| PersistenceError::new("Commit timed out after 60 seconds"))?
}
pub fn commit_sync(
app: &mut App,
connection: Arc<dyn DatabaseConnection>,
store: impl Into<String>,
) -> Result<(), PersistenceError> {
let rt = { app.world().resource::<TokioRuntime>().runtime.clone() };
rt.block_on(commit(app, connection, store))
}