use async_channel::{Receiver, Sender};
use bevy_app::{App, AppExit, AppLabel, Plugin, SubApp};
use bevy_ecs::{
resource::Resource,
schedule::MainThreadExecutor,
world::{Mut, World},
};
use bevy_tasks::ComputeTaskPool;
use crate::RenderApp;
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq, AppLabel)]
pub struct RenderExtractApp;
#[derive(Resource)]
pub struct RenderAppChannels {
app_to_render_sender: Sender<SubApp>,
render_to_app_receiver: Receiver<SubApp>,
render_app_in_render_thread: bool,
}
impl RenderAppChannels {
pub fn new(
app_to_render_sender: Sender<SubApp>,
render_to_app_receiver: Receiver<SubApp>,
) -> Self {
Self {
app_to_render_sender,
render_to_app_receiver,
render_app_in_render_thread: false,
}
}
pub fn send_blocking(&mut self, render_app: SubApp) {
self.app_to_render_sender.send_blocking(render_app).unwrap();
self.render_app_in_render_thread = true;
}
pub async fn recv(&mut self) -> Option<SubApp> {
let render_app = self.render_to_app_receiver.recv().await.ok()?;
self.render_app_in_render_thread = false;
Some(render_app)
}
}
impl Drop for RenderAppChannels {
fn drop(&mut self) {
if self.render_app_in_render_thread {
self.render_to_app_receiver.recv_blocking().ok();
}
}
}
#[derive(Default)]
pub struct PipelinedRenderingPlugin;
impl Plugin for PipelinedRenderingPlugin {
fn build(&self, app: &mut App) {
if app.get_sub_app(RenderApp).is_none() {
return;
}
app.insert_resource(MainThreadExecutor::new());
let mut sub_app = SubApp::new();
sub_app.set_extract(renderer_extract);
app.insert_sub_app(RenderExtractApp, sub_app);
}
fn cleanup(&self, app: &mut App) {
if app.get_sub_app(RenderExtractApp).is_none() {
return;
}
let (app_to_render_sender, app_to_render_receiver) = async_channel::bounded::<SubApp>(1);
let (render_to_app_sender, render_to_app_receiver) = async_channel::bounded::<SubApp>(1);
let mut render_app = app
.remove_sub_app(RenderApp)
.expect("Unable to get RenderApp. Another plugin may have removed the RenderApp before PipelinedRenderingPlugin");
let executor = app.world().get_resource::<MainThreadExecutor>().unwrap();
render_app.world_mut().insert_resource(executor.clone());
render_to_app_sender.send_blocking(render_app).unwrap();
app.insert_resource(RenderAppChannels::new(
app_to_render_sender,
render_to_app_receiver,
));
std::thread::spawn(move || {
#[cfg(feature = "trace")]
let _span = tracing::info_span!("render thread").entered();
let compute_task_pool = ComputeTaskPool::get();
loop {
let sent_app = compute_task_pool
.scope(|s| {
s.spawn(async { app_to_render_receiver.recv().await });
})
.pop();
let Some(Ok(mut render_app)) = sent_app else {
break;
};
{
#[cfg(feature = "trace")]
let _sub_app_span = tracing::info_span!("sub app", name = ?RenderApp).entered();
render_app.update();
}
if render_to_app_sender.send_blocking(render_app).is_err() {
break;
}
}
tracing::debug!("exiting pipelined rendering thread");
});
}
}
fn renderer_extract(app_world: &mut World, _world: &mut World) {
app_world.resource_scope(|world, main_thread_executor: Mut<MainThreadExecutor>| {
world.resource_scope(|world, mut render_channels: Mut<RenderAppChannels>| {
if let Some(mut render_app) = ComputeTaskPool::get()
.scope_with_executor(true, Some(&*main_thread_executor.0), |s| {
s.spawn(async { render_channels.recv().await });
})
.pop()
.unwrap()
{
render_app.extract(world);
render_channels.send_blocking(render_app);
} else {
world.write_message(AppExit::error());
}
});
});
}