use super::types::*;
use std::collections::HashMap;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::stream::FuturesUnordered;
use futures::{Future, Stream};
use slotmap::SlotMap;
use crate::error::AwsmError;
pub struct MaterialState {
pub def: MaterialDef,
pub status: PipelineGroupStatus,
pub generation: u32,
}
pub struct PassState {
pub def: PassDef,
pub status: PipelineGroupStatus,
pub generation: u32,
}
pub struct CompileResolution {
pub id: PipelineGroupId,
pub generation: u32,
pub result: Result<(), AwsmError>,
}
type PendingFuture = Pin<Box<dyn Future<Output = CompileResolution> + 'static>>;
#[derive(Clone)]
pub enum CompileInstallTarget {
ClassifyDynamic {
dispatch_hash: u64,
msaa: Option<u32>,
},
OpaqueDynamic {
shader_id: awsm_materials::MaterialShaderId,
msaa: Option<u32>,
mipmaps: bool,
},
EdgeResolveShade {
shader_id: awsm_materials::MaterialShaderId,
mipmaps: bool,
},
EdgeResolveFinalBlend,
}
pub struct PipelineCompileResolution {
pub id: PipelineGroupId,
pub generation: u32,
pub target: CompileInstallTarget,
pub cache_key: crate::pipelines::compute_pipeline::ComputePipelineCacheKey,
pub result: std::result::Result<web_sys::GpuComputePipeline, wasm_bindgen::JsValue>,
pub compile_error: Option<String>,
}
type PipelineCompileFuture = Pin<Box<dyn Future<Output = PipelineCompileResolution> + 'static>>;
#[derive(Debug)]
pub struct StatusEvent {
pub id: PipelineGroupId,
pub status: PipelineGroupStatus,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct CompileProgress {
pub materials_pending: usize,
pub materials_ready: usize,
pub materials_failed: usize,
pub in_flight_subcompiles: u32,
}
impl CompileProgress {
pub fn is_idle(&self) -> bool {
self.materials_pending == 0 && self.in_flight_subcompiles == 0
}
pub fn materials_total(&self) -> usize {
self.materials_pending + self.materials_ready + self.materials_failed
}
}
pub struct PipelineScheduler {
materials: SlotMap<MaterialId, MaterialState>,
passes: HashMap<PassKind, PassState>,
inflight: FuturesUnordered<PendingFuture>,
pub(crate) inflight_compile: FuturesUnordered<PipelineCompileFuture>,
pending_subcompiles: HashMap<MaterialId, u32>,
inflight_compute_cache_waiters:
HashMap<crate::pipelines::compute_pipeline::ComputePipelineCacheKey, Vec<MaterialId>>,
events: Vec<StatusEvent>,
}
impl Default for PipelineScheduler {
fn default() -> Self {
Self::new()
}
}
impl PipelineScheduler {
pub fn new() -> Self {
Self {
materials: SlotMap::with_key(),
passes: HashMap::new(),
inflight: FuturesUnordered::new(),
inflight_compile: FuturesUnordered::new(),
pending_subcompiles: HashMap::new(),
inflight_compute_cache_waiters: HashMap::new(),
events: Vec::new(),
}
}
pub fn register_compute_compile_waiter(
&mut self,
cache_key: crate::pipelines::compute_pipeline::ComputePipelineCacheKey,
mid: MaterialId,
) -> bool {
*self.pending_subcompiles.entry(mid).or_insert(0) += 1;
let was_first = !self.inflight_compute_cache_waiters.contains_key(&cache_key);
self.inflight_compute_cache_waiters
.entry(cache_key)
.or_default()
.push(mid);
was_first
}
pub fn take_compute_compile_waiters(
&mut self,
cache_key: &crate::pipelines::compute_pipeline::ComputePipelineCacheKey,
) -> Vec<MaterialId> {
self.inflight_compute_cache_waiters
.remove(cache_key)
.unwrap_or_default()
}
pub fn has_compute_compile_waiter(
&self,
cache_key: &crate::pipelines::compute_pipeline::ComputePipelineCacheKey,
) -> bool {
self.inflight_compute_cache_waiters.contains_key(cache_key)
}
pub(crate) fn push_compile_future_no_count(&mut self, future: PipelineCompileFuture) {
self.inflight_compile.push(future);
}
pub fn push_compile_future(&mut self, id: PipelineGroupId, future: PipelineCompileFuture) {
if let PipelineGroupId::Material(mid) = id {
*self.pending_subcompiles.entry(mid).or_insert(0) += 1;
}
self.inflight_compile.push(future);
}
pub fn next_compile_resolution(&mut self) -> Option<PipelineCompileResolution> {
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut self.inflight_compile).poll_next(&mut cx) {
Poll::Ready(Some(r)) => Some(r),
_ => None,
}
}
pub fn note_subcompile_complete(&mut self, mid: MaterialId) -> bool {
let count = self.pending_subcompiles.entry(mid).or_insert(0);
if *count > 0 {
*count -= 1;
}
if *count == 0 {
self.pending_subcompiles.remove(&mid);
if let Some(state) = self.materials.get_mut(mid) {
if state.status.is_pending() {
state.status = PipelineGroupStatus::Ready;
self.events.push(StatusEvent {
id: PipelineGroupId::Material(mid),
status: PipelineGroupStatus::Ready,
});
tracing::info!(
target: "awsm_renderer::pipeline_readiness",
"subcompile-complete: material({:?}) -> Ready",
mid
);
return true;
}
}
}
false
}
pub fn submit_pipeline_group_batch(
&mut self,
defs: Vec<PipelineGroupDef>,
) -> Vec<PipelineGroupId> {
let mut ids = Vec::with_capacity(defs.len());
for def in defs {
let id = match def {
PipelineGroupDef::Material(mdef) => {
let mat_id = self.materials.insert(MaterialState {
def: mdef,
status: PipelineGroupStatus::Pending,
generation: 0,
});
let id = PipelineGroupId::Material(mat_id);
self.events.push(StatusEvent {
id,
status: PipelineGroupStatus::Pending,
});
id
}
PipelineGroupDef::Pass(pdef) => {
let kind = pdef.kind();
let generation = self
.passes
.get(&kind)
.map(|s| s.generation.wrapping_add(1))
.unwrap_or(0);
self.passes.insert(
kind,
PassState {
def: pdef,
status: PipelineGroupStatus::Pending,
generation,
},
);
let id = PipelineGroupId::Pass(kind);
self.events.push(StatusEvent {
id,
status: PipelineGroupStatus::Pending,
});
id
}
};
ids.push(id);
}
tracing::info!(
target: "awsm_renderer::pipeline_readiness",
"submit_pipeline_group_batch: {} groups submitted",
ids.len()
);
ids
}
pub fn mark_ready(&mut self, id: PipelineGroupId) {
let label;
match id {
PipelineGroupId::Material(mid) => {
let Some(state) = self.materials.get_mut(mid) else {
return;
};
if state.status.is_ready() {
return;
}
state.status = PipelineGroupStatus::Ready;
label = format!("material:{:?}", mid);
}
PipelineGroupId::Pass(kind) => {
let Some(state) = self.passes.get_mut(&kind) else {
return;
};
if state.status.is_ready() {
return;
}
state.status = PipelineGroupStatus::Ready;
label = format!("pass:{:?}", kind);
}
}
self.events.push(StatusEvent {
id,
status: PipelineGroupStatus::Ready,
});
tracing::info!(
target: "awsm_renderer::pipeline_readiness",
"mark_ready: {} -> Ready",
label
);
}
pub fn mark_failed(&mut self, id: PipelineGroupId, error: AwsmError) {
let label;
match id {
PipelineGroupId::Material(mid) => {
let Some(state) = self.materials.get_mut(mid) else {
return;
};
state.status = PipelineGroupStatus::Failed { error };
label = format!("material:{:?}", mid);
}
PipelineGroupId::Pass(kind) => {
let Some(state) = self.passes.get_mut(&kind) else {
return;
};
state.status = PipelineGroupStatus::Failed { error };
label = format!("pass:{:?}", kind);
}
}
self.events.push(StatusEvent {
id,
status: PipelineGroupStatus::Failed {
error: AwsmError::PipelineVariantNotCompiled("see scheduler state"),
},
});
tracing::warn!(
target: "awsm_renderer::pipeline_readiness",
"mark_failed: {} -> Failed",
label
);
}
pub fn pipeline_group_status(&self, id: PipelineGroupId) -> Option<&PipelineGroupStatus> {
match id {
PipelineGroupId::Material(mid) => self.materials.get(mid).map(|s| &s.status),
PipelineGroupId::Pass(kind) => self.passes.get(&kind).map(|s| &s.status),
}
}
pub fn compile_progress(&self) -> CompileProgress {
let mut progress = CompileProgress::default();
for state in self.materials.values() {
match &state.status {
PipelineGroupStatus::Pending => progress.materials_pending += 1,
PipelineGroupStatus::Ready => progress.materials_ready += 1,
PipelineGroupStatus::Failed { .. } => progress.materials_failed += 1,
}
}
progress.in_flight_subcompiles = self.pending_subcompiles.values().copied().sum();
progress
}
pub fn drain_status_events(&mut self) -> Vec<StatusEvent> {
std::mem::take(&mut self.events)
}
pub fn drop_material_group(&mut self, id: MaterialId) {
self.materials.remove(id);
}
pub fn material_generation(&self, mid: MaterialId) -> Option<u32> {
self.materials.get(mid).map(|s| s.generation)
}
pub fn mark_material_pending_for_relaunch(&mut self, id: PipelineGroupId) {
let PipelineGroupId::Material(mid) = id else {
return;
};
let Some(state) = self.materials.get_mut(mid) else {
return;
};
state.status = PipelineGroupStatus::Pending;
state.generation = state.generation.wrapping_add(1);
self.events.push(StatusEvent {
id,
status: PipelineGroupStatus::Pending,
});
tracing::info!(
target: "awsm_renderer::pipeline_readiness",
"mark_material_pending_for_relaunch: material({:?}) -> Pending (bucket/pool relaunch)",
mid
);
}
pub fn pending_subcompile_count(&self, mid: MaterialId) -> u32 {
self.pending_subcompiles.get(&mid).copied().unwrap_or(0)
}
pub fn find_material_by_shader_id(
&self,
shader_id: awsm_materials::MaterialShaderId,
) -> Option<MaterialId> {
for (mid, state) in &self.materials {
if state.def.shader_id == shader_id {
return Some(mid);
}
}
None
}
pub fn poll_resolved(&mut self) -> usize {
let mut applied = 0;
let waker = futures::task::noop_waker();
let mut cx = Context::from_waker(&waker);
while let Poll::Ready(Some(resolution)) = Pin::new(&mut self.inflight).poll_next(&mut cx) {
self.apply_resolution(resolution);
applied += 1;
}
applied
}
fn apply_resolution(&mut self, r: CompileResolution) {
let new_status = match r.result {
Ok(()) => PipelineGroupStatus::Ready,
Err(e) => PipelineGroupStatus::Failed { error: e },
};
match r.id {
PipelineGroupId::Material(mid) => {
let Some(state) = self.materials.get_mut(mid) else {
return;
};
if state.generation != r.generation {
return;
}
let label = format!("{:?}", mid);
state.status = match &new_status {
PipelineGroupStatus::Ready => PipelineGroupStatus::Ready,
PipelineGroupStatus::Failed { error: _ } => match new_status {
PipelineGroupStatus::Failed { error } => {
PipelineGroupStatus::Failed { error }
}
_ => unreachable!(),
},
PipelineGroupStatus::Pending => PipelineGroupStatus::Pending,
};
tracing::info!(
target: "awsm_renderer::pipeline_readiness",
"transition: material {} -> {}",
label,
status_label(&state.status),
);
}
PipelineGroupId::Pass(kind) => {
let Some(state) = self.passes.get_mut(&kind) else {
return;
};
if state.generation != r.generation {
return;
}
let label = format!("{:?}", kind);
state.status = new_status;
tracing::info!(
target: "awsm_renderer::pipeline_readiness",
"transition: pass {} -> {}",
label,
status_label(&state.status),
);
}
}
let final_status = match r.id {
PipelineGroupId::Material(mid) => self.materials.get(mid).map(|s| match &s.status {
PipelineGroupStatus::Pending => StatusEvent {
id: r.id,
status: PipelineGroupStatus::Pending,
},
PipelineGroupStatus::Ready => StatusEvent {
id: r.id,
status: PipelineGroupStatus::Ready,
},
PipelineGroupStatus::Failed { error: _ } => StatusEvent {
id: r.id,
status: PipelineGroupStatus::Failed {
error: AwsmError::PipelineVariantNotCompiled("see scheduler state"),
},
},
}),
PipelineGroupId::Pass(kind) => self.passes.get(&kind).map(|s| match &s.status {
PipelineGroupStatus::Pending => StatusEvent {
id: r.id,
status: PipelineGroupStatus::Pending,
},
PipelineGroupStatus::Ready => StatusEvent {
id: r.id,
status: PipelineGroupStatus::Ready,
},
PipelineGroupStatus::Failed { error: _ } => StatusEvent {
id: r.id,
status: PipelineGroupStatus::Failed {
error: AwsmError::PipelineVariantNotCompiled("see scheduler state"),
},
},
}),
};
if let Some(ev) = final_status {
self.events.push(ev);
}
}
}
fn status_label(s: &PipelineGroupStatus) -> &'static str {
match s {
PipelineGroupStatus::Pending => "Pending",
PipelineGroupStatus::Ready => "Ready",
PipelineGroupStatus::Failed { .. } => "Failed",
}
}
use std::sync::Mutex;
pub fn warn_pipeline_not_compiled(location: &'static str, id: &str) {
static SEEN: Mutex<Option<std::collections::HashSet<(&'static str, String)>>> =
Mutex::new(None);
let mut guard = SEEN.lock().unwrap_or_else(|p| p.into_inner());
let set = guard.get_or_insert_with(std::collections::HashSet::new);
let key = (location, id.to_string());
if set.insert(key.clone()) {
tracing::warn!(
target: "awsm_renderer::pipeline_readiness",
"render-frame preamble: pipeline not compiled at {} (id={}) — skipping. \
First occurrence — subsequent occurrences for this (location, id) are \
suppressed for the rest of the session.",
location,
id,
);
}
}
#[allow(dead_code)]
fn stub_compile_future(id: PipelineGroupId, generation: u32) -> PendingFuture {
Box::pin(async move {
CompileResolution {
id,
generation,
result: Ok(()),
}
})
}