use alloc::vec::Vec;
use crate::{J2kBlockCodingMode, J2kError, J2kLosslessEncodeOptions, J2kLosslessSamples};
use j2k_core::{BackendCapabilities, BackendKind, BackendRequest, Unsupported};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveBackendRequest {
Accelerated,
CpuOnly,
StrictDevice(BackendKind),
}
impl J2kAdaptiveBackendRequest {
#[must_use]
pub const fn from_backend_request(request: BackendRequest) -> Self {
match request {
BackendRequest::Auto => Self::Accelerated,
BackendRequest::Cpu => Self::CpuOnly,
BackendRequest::Metal => Self::StrictDevice(BackendKind::Metal),
BackendRequest::Cuda => Self::StrictDevice(BackendKind::Cuda),
}
}
}
#[derive(Debug, Clone, Copy)]
struct LosslessEncodeReferencePolicy {
components: u8,
min_pixels: u64,
dwt_cpu_ns: u64,
dwt_accelerated_ns: u64,
ht_cpu_ns: u64,
ht_accelerated_ns: u64,
end_to_end_cpu_ns: u64,
end_to_end_accelerated_ns: u64,
criterion_noise_percent: f64,
}
const CUDA_HTJ2K_HOST_ENCODE_REFERENCE_POLICIES: [LosslessEncodeReferencePolicy; 2] = [
LosslessEncodeReferencePolicy {
components: 3,
min_pixels: 1024 * 1024,
dwt_cpu_ns: 19_506_000,
dwt_accelerated_ns: 2_616_000,
ht_cpu_ns: 4_566_000,
ht_accelerated_ns: 2_002_000,
end_to_end_cpu_ns: 81_419_000,
end_to_end_accelerated_ns: 41_307_000,
criterion_noise_percent: 2.0,
},
LosslessEncodeReferencePolicy {
components: 4,
min_pixels: 1024 * 1024,
dwt_cpu_ns: 19_506_000,
dwt_accelerated_ns: 2_616_000,
ht_cpu_ns: 4_566_000,
ht_accelerated_ns: 2_002_000,
end_to_end_cpu_ns: 108_350_000,
end_to_end_accelerated_ns: 53_360_000,
criterion_noise_percent: 2.0,
},
];
fn cuda_htj2k_host_encode_reference_policy(
workload: J2kAdaptiveWorkload,
) -> Option<&'static LosslessEncodeReferencePolicy> {
let pixels = u64::from(workload.tile_size.0).saturating_mul(u64::from(workload.tile_size.1));
CUDA_HTJ2K_HOST_ENCODE_REFERENCE_POLICIES
.iter()
.find(|policy| {
workload.operation == J2kAdaptiveOperation::Encode
&& workload.codec_mode == J2kAdaptiveCodecMode::Htj2k
&& workload.quality_mode == J2kAdaptiveQualityMode::Lossless
&& workload.components == policy.components
&& workload.bit_depth == 8
&& workload.batch_size == 1
&& !workload.roi
&& !workload.scaled
&& workload.quality_layers == 1
&& workload.output_residency == J2kAdaptiveOutputResidency::Host
&& pixels >= policy.min_pixels
})
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveOperation {
Encode,
Decode,
Transcode,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveCodecMode {
ClassicJ2k,
Htj2k,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveQualityMode {
Lossless,
Lossy,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveOutputResidency {
Host,
Device,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct J2kAdaptiveWorkload {
pub operation: J2kAdaptiveOperation,
pub codec_mode: J2kAdaptiveCodecMode,
pub quality_mode: J2kAdaptiveQualityMode,
pub components: u8,
pub bit_depth: u8,
pub tile_size: (u32, u32),
pub batch_size: u16,
pub roi: bool,
pub scaled: bool,
pub quality_layers: u16,
pub output_residency: J2kAdaptiveOutputResidency,
}
impl J2kAdaptiveWorkload {
#[must_use]
pub const fn new(
operation: J2kAdaptiveOperation,
codec_mode: J2kAdaptiveCodecMode,
quality_mode: J2kAdaptiveQualityMode,
components: u8,
bit_depth: u8,
tile_size: (u32, u32),
batch_size: u16,
) -> Self {
Self {
operation,
codec_mode,
quality_mode,
components,
bit_depth,
tile_size,
batch_size,
roi: false,
scaled: false,
quality_layers: 1,
output_residency: J2kAdaptiveOutputResidency::Host,
}
}
#[must_use]
pub const fn with_roi(mut self, roi: bool) -> Self {
self.roi = roi;
self
}
#[must_use]
pub const fn with_scaled(mut self, scaled: bool) -> Self {
self.scaled = scaled;
self
}
#[must_use]
pub const fn with_quality_layers(mut self, quality_layers: u16) -> Self {
self.quality_layers = quality_layers;
self
}
#[must_use]
pub const fn with_output_residency(
mut self,
output_residency: J2kAdaptiveOutputResidency,
) -> Self {
self.output_residency = output_residency;
self
}
#[must_use]
pub fn logical_owner_for(self, stage: J2kAdaptiveStage) -> J2kAdaptiveStageOwner {
if self.is_small_cpu_workload() {
return J2kAdaptiveStageOwner::Cpu;
}
match stage {
J2kAdaptiveStage::MarkerParsing
| J2kAdaptiveStage::CopySync
| J2kAdaptiveStage::Validation => J2kAdaptiveStageOwner::Cpu,
J2kAdaptiveStage::Mct => {
if self.components >= 3 && self.is_wsi_shaped() {
J2kAdaptiveStageOwner::Gpu
} else {
J2kAdaptiveStageOwner::Variable
}
}
J2kAdaptiveStage::Dwt => match self.operation {
J2kAdaptiveOperation::Encode | J2kAdaptiveOperation::Transcode
if self.is_wsi_shaped() =>
{
J2kAdaptiveStageOwner::Gpu
}
_ => J2kAdaptiveStageOwner::Cpu,
},
J2kAdaptiveStage::Idwt => match self.operation {
J2kAdaptiveOperation::Decode | J2kAdaptiveOperation::Transcode
if self.is_wsi_shaped() =>
{
J2kAdaptiveStageOwner::Gpu
}
_ => J2kAdaptiveStageOwner::Cpu,
},
J2kAdaptiveStage::Quantization => {
if self.quality_mode == J2kAdaptiveQualityMode::Lossy && self.is_wsi_shaped() {
J2kAdaptiveStageOwner::Gpu
} else {
J2kAdaptiveStageOwner::Cpu
}
}
J2kAdaptiveStage::HtBlockCoding => {
if self.codec_mode == J2kAdaptiveCodecMode::Htj2k && self.is_wsi_shaped() {
J2kAdaptiveStageOwner::Gpu
} else {
J2kAdaptiveStageOwner::Cpu
}
}
J2kAdaptiveStage::Tier1
| J2kAdaptiveStage::PcrdRateControl
| J2kAdaptiveStage::Packetization
| J2kAdaptiveStage::CodestreamAssembly => J2kAdaptiveStageOwner::Variable,
}
}
fn is_wsi_shaped(self) -> bool {
let pixels = u64::from(self.tile_size.0).saturating_mul(u64::from(self.tile_size.1));
pixels >= 512 * 512 || self.batch_size >= 16
}
fn is_small_cpu_workload(self) -> bool {
let pixels = u64::from(self.tile_size.0).saturating_mul(u64::from(self.tile_size.1));
pixels < 512 * 512 && self.batch_size <= 1
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveStage {
MarkerParsing,
Mct,
Dwt,
Idwt,
Quantization,
Tier1,
HtBlockCoding,
PcrdRateControl,
Packetization,
CodestreamAssembly,
CopySync,
Validation,
}
impl J2kAdaptiveStage {
pub const ALL: [Self; 12] = [
Self::MarkerParsing,
Self::Mct,
Self::Dwt,
Self::Idwt,
Self::Quantization,
Self::Tier1,
Self::HtBlockCoding,
Self::PcrdRateControl,
Self::Packetization,
Self::CodestreamAssembly,
Self::CopySync,
Self::Validation,
];
#[must_use]
pub const fn profile_label(self) -> &'static str {
match self {
Self::MarkerParsing => "marker_parsing",
Self::Mct => "mct_rct_ict",
Self::Dwt => "dwt",
Self::Idwt => "idwt",
Self::Quantization => "quantization",
Self::Tier1 => "tier1",
Self::HtBlockCoding => "ht_block_coding",
Self::PcrdRateControl => "pcrd_rate_control",
Self::Packetization => "packetization",
Self::CodestreamAssembly => "codestream_assembly",
Self::CopySync => "copy_sync",
Self::Validation => "validation",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveStageOwner {
Cpu,
Gpu,
Variable,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveRcaReason {
AlgorithmicMismatch,
TransferSyncOverhead,
MissingBatching,
MissingResidency,
TooSmallWorkload,
BenchmarkMismatch,
CpuGenuinelyBetter,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct J2kAdaptiveRcaFinding {
pub stage: J2kAdaptiveStage,
pub backend: BackendKind,
pub reason: J2kAdaptiveRcaReason,
pub reclassify_cpu: bool,
}
impl J2kAdaptiveRcaFinding {
#[must_use]
pub const fn reclassify_cpu(
stage: J2kAdaptiveStage,
backend: BackendKind,
reason: J2kAdaptiveRcaReason,
) -> Self {
Self {
stage,
backend,
reason,
reclassify_cpu: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct J2kAdaptiveBenchmarkEvidence {
pub scope: J2kAdaptiveBenchmarkScope,
pub backend: BackendKind,
pub cpu_ns: u64,
pub accelerated_ns: u64,
pub criterion_noise_percent: f64,
}
impl J2kAdaptiveBenchmarkEvidence {
#[must_use]
pub const fn stage(
stage: J2kAdaptiveStage,
backend: BackendKind,
cpu_ns: u64,
accelerated_ns: u64,
criterion_noise_percent: f64,
) -> Self {
Self {
scope: J2kAdaptiveBenchmarkScope::Stage(stage),
backend,
cpu_ns,
accelerated_ns,
criterion_noise_percent,
}
}
#[must_use]
pub const fn end_to_end(
backend: BackendKind,
cpu_ns: u64,
accelerated_ns: u64,
criterion_noise_percent: f64,
) -> Self {
Self {
scope: J2kAdaptiveBenchmarkScope::EndToEnd,
backend,
cpu_ns,
accelerated_ns,
criterion_noise_percent,
}
}
#[must_use]
#[allow(clippy::cast_precision_loss)]
pub fn improvement_percent(self) -> f64 {
if self.accelerated_ns == 0 {
return f64::INFINITY;
}
((self.cpu_ns as f64 / self.accelerated_ns as f64) - 1.0) * 100.0
}
fn passes(self, policy: J2kAdaptiveGatePolicy) -> bool {
self.cpu_ns > 0
&& self.accelerated_ns > 0
&& self.improvement_percent()
>= policy.min_speedup_percent + self.criterion_noise_percent
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveBenchmarkScope {
Stage(J2kAdaptiveStage),
EndToEnd,
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct J2kAdaptiveBenchmarks {
stage: Vec<J2kAdaptiveBenchmarkEvidence>,
end_to_end: Vec<J2kAdaptiveBenchmarkEvidence>,
}
impl J2kAdaptiveBenchmarks {
pub fn push_stage(&mut self, evidence: J2kAdaptiveBenchmarkEvidence) {
debug_assert!(matches!(
evidence.scope,
J2kAdaptiveBenchmarkScope::Stage(_)
));
self.stage.push(evidence);
}
pub fn push_end_to_end(&mut self, evidence: J2kAdaptiveBenchmarkEvidence) {
debug_assert!(matches!(
evidence.scope,
J2kAdaptiveBenchmarkScope::EndToEnd
));
self.end_to_end.push(evidence);
}
fn stage_for(
&self,
stage: J2kAdaptiveStage,
backend: BackendKind,
) -> Option<J2kAdaptiveBenchmarkEvidence> {
self.stage.iter().rev().copied().find(|evidence| {
evidence.backend == backend && evidence.scope == J2kAdaptiveBenchmarkScope::Stage(stage)
})
}
fn end_to_end_for(&self, backend: BackendKind) -> Option<J2kAdaptiveBenchmarkEvidence> {
self.end_to_end
.iter()
.rev()
.copied()
.find(|evidence| evidence.backend == backend)
}
fn has_evidence_for(&self, backend: BackendKind) -> bool {
self.end_to_end_for(backend).is_some()
|| self
.stage
.iter()
.any(|evidence| evidence.backend == backend)
}
fn best_observed_ns_for(&self, backend: BackendKind) -> Option<u64> {
let end_to_end = self
.end_to_end_for(backend)
.map(|evidence| evidence.accelerated_ns);
let stage = self
.stage
.iter()
.rev()
.find(|evidence| evidence.backend == backend)
.map(|evidence| evidence.accelerated_ns);
end_to_end.or(stage)
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct J2kAdaptiveGatePolicy {
pub min_speedup_percent: f64,
}
impl Default for J2kAdaptiveGatePolicy {
fn default() -> Self {
Self {
min_speedup_percent: 10.0,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveRouteKind {
CpuOnly,
Hybrid,
StrictDevice,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum J2kAdaptiveStageGateStatus {
CpuShaped,
VariableCpuDefault,
Approved,
BenchmarkGateMissing,
BlockedNeedsRca,
ReclassifiedCpu,
StrictDeviceProof,
EndToEndGateBlocked,
}
#[derive(Debug, Clone, PartialEq)]
pub struct J2kAdaptiveStageDecision {
pub stage: J2kAdaptiveStage,
pub logical_owner: J2kAdaptiveStageOwner,
pub selected_backend: BackendKind,
pub gate_status: J2kAdaptiveStageGateStatus,
pub improvement_percent: Option<f64>,
pub rca_reason: Option<J2kAdaptiveRcaReason>,
}
impl J2kAdaptiveStageDecision {
#[must_use]
pub fn requires_rca(&self) -> bool {
matches!(
self.gate_status,
J2kAdaptiveStageGateStatus::BenchmarkGateMissing
| J2kAdaptiveStageGateStatus::BlockedNeedsRca
)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct J2kAdaptiveRouteReport {
pub request: J2kAdaptiveBackendRequest,
pub route_kind: J2kAdaptiveRouteKind,
pub selected_device: Option<BackendKind>,
pub stages: Vec<J2kAdaptiveStageDecision>,
}
impl J2kAdaptiveRouteReport {
#[must_use]
pub fn stage(&self, stage: J2kAdaptiveStage) -> Option<&J2kAdaptiveStageDecision> {
self.stages.iter().find(|decision| decision.stage == stage)
}
#[must_use]
pub fn has_unresolved_rca(&self) -> bool {
self.stages
.iter()
.any(J2kAdaptiveStageDecision::requires_rca)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct J2kAdaptiveRoutePlanner {
capabilities: BackendCapabilities,
policy: J2kAdaptiveGatePolicy,
rca_findings: Vec<J2kAdaptiveRcaFinding>,
}
impl J2kAdaptiveRoutePlanner {
#[must_use]
pub fn new(capabilities: BackendCapabilities) -> Self {
Self {
capabilities,
policy: J2kAdaptiveGatePolicy::default(),
rca_findings: Vec::new(),
}
}
#[must_use]
pub fn lossless_encode(capabilities: BackendCapabilities) -> Self {
Self::new(capabilities).with_lossless_encode_policy()
}
#[must_use]
pub fn compile_time_defaults() -> Self {
Self::new(BackendCapabilities::compile_time_defaults())
}
#[must_use]
pub const fn with_policy(mut self, policy: J2kAdaptiveGatePolicy) -> Self {
self.policy = policy;
self
}
#[must_use]
pub fn with_rca_finding(mut self, finding: J2kAdaptiveRcaFinding) -> Self {
self.rca_findings.push(finding);
self
}
#[must_use]
pub fn with_lossless_encode_policy(self) -> Self {
self.with_rca_finding(J2kAdaptiveRcaFinding::reclassify_cpu(
J2kAdaptiveStage::Mct,
BackendKind::Cuda,
J2kAdaptiveRcaReason::CpuGenuinelyBetter,
))
}
pub fn plan_lossless_encode(
&self,
samples: J2kLosslessSamples<'_>,
options: J2kLosslessEncodeOptions,
) -> Result<J2kAdaptiveRouteReport, J2kError> {
let workload = Self::lossless_encode_workload(samples, options);
let benchmarks = Self::lossless_encode_benchmarks(workload);
self.plan(
workload,
J2kAdaptiveBackendRequest::Accelerated,
&benchmarks,
)
}
#[must_use]
pub fn lossless_encode_workload(
samples: J2kLosslessSamples<'_>,
options: J2kLosslessEncodeOptions,
) -> J2kAdaptiveWorkload {
let codec_mode = match options.block_coding_mode {
J2kBlockCodingMode::Classic => J2kAdaptiveCodecMode::ClassicJ2k,
J2kBlockCodingMode::HighThroughput => J2kAdaptiveCodecMode::Htj2k,
};
J2kAdaptiveWorkload::new(
J2kAdaptiveOperation::Encode,
codec_mode,
J2kAdaptiveQualityMode::Lossless,
samples.components,
samples.bit_depth,
(samples.width, samples.height),
1,
)
}
#[must_use]
pub fn lossless_encode_benchmarks(workload: J2kAdaptiveWorkload) -> J2kAdaptiveBenchmarks {
let mut benchmarks = J2kAdaptiveBenchmarks::default();
if let Some(policy) = cuda_htj2k_host_encode_reference_policy(workload) {
benchmarks.push_stage(J2kAdaptiveBenchmarkEvidence::stage(
J2kAdaptiveStage::Dwt,
BackendKind::Cuda,
policy.dwt_cpu_ns,
policy.dwt_accelerated_ns,
policy.criterion_noise_percent,
));
benchmarks.push_stage(J2kAdaptiveBenchmarkEvidence::stage(
J2kAdaptiveStage::HtBlockCoding,
BackendKind::Cuda,
policy.ht_cpu_ns,
policy.ht_accelerated_ns,
policy.criterion_noise_percent,
));
benchmarks.push_end_to_end(J2kAdaptiveBenchmarkEvidence::end_to_end(
BackendKind::Cuda,
policy.end_to_end_cpu_ns,
policy.end_to_end_accelerated_ns,
policy.criterion_noise_percent,
));
}
benchmarks
}
pub fn plan(
&self,
workload: J2kAdaptiveWorkload,
request: J2kAdaptiveBackendRequest,
benchmarks: &J2kAdaptiveBenchmarks,
) -> Result<J2kAdaptiveRouteReport, J2kError> {
match request {
J2kAdaptiveBackendRequest::CpuOnly => Ok(Self::cpu_only_report(workload, request)),
J2kAdaptiveBackendRequest::StrictDevice(backend) => {
self.strict_device_report(workload, request, backend)
}
J2kAdaptiveBackendRequest::Accelerated => {
Ok(self.accelerated_report(workload, request, benchmarks))
}
}
}
fn cpu_only_report(
workload: J2kAdaptiveWorkload,
request: J2kAdaptiveBackendRequest,
) -> J2kAdaptiveRouteReport {
let stages = J2kAdaptiveStage::ALL
.into_iter()
.map(|stage| J2kAdaptiveStageDecision {
stage,
logical_owner: workload.logical_owner_for(stage),
selected_backend: BackendKind::Cpu,
gate_status: J2kAdaptiveStageGateStatus::CpuShaped,
improvement_percent: None,
rca_reason: None,
})
.collect();
J2kAdaptiveRouteReport {
request,
route_kind: J2kAdaptiveRouteKind::CpuOnly,
selected_device: None,
stages,
}
}
fn strict_device_report(
&self,
workload: J2kAdaptiveWorkload,
request: J2kAdaptiveBackendRequest,
backend: BackendKind,
) -> Result<J2kAdaptiveRouteReport, J2kError> {
if !self.supports_backend(backend) {
return Err(Unsupported {
what: "strict JPEG 2000 device route is unavailable",
}
.into());
}
let stages = J2kAdaptiveStage::ALL
.into_iter()
.map(|stage| {
let logical_owner = workload.logical_owner_for(stage);
let selected_backend = if logical_owner == J2kAdaptiveStageOwner::Cpu {
BackendKind::Cpu
} else {
backend
};
J2kAdaptiveStageDecision {
stage,
logical_owner,
selected_backend,
gate_status: if selected_backend == BackendKind::Cpu {
J2kAdaptiveStageGateStatus::CpuShaped
} else {
J2kAdaptiveStageGateStatus::StrictDeviceProof
},
improvement_percent: None,
rca_reason: None,
}
})
.collect();
Ok(J2kAdaptiveRouteReport {
request,
route_kind: J2kAdaptiveRouteKind::StrictDevice,
selected_device: Some(backend),
stages,
})
}
fn accelerated_report(
&self,
workload: J2kAdaptiveWorkload,
request: J2kAdaptiveBackendRequest,
benchmarks: &J2kAdaptiveBenchmarks,
) -> J2kAdaptiveRouteReport {
let Some(backend) = self.best_approved_device(workload, benchmarks) else {
return self.gated_cpu_report(
workload,
request,
self.best_candidate_device(benchmarks),
benchmarks,
);
};
let mut stages = Vec::with_capacity(J2kAdaptiveStage::ALL.len());
let mut unresolved = false;
for stage in J2kAdaptiveStage::ALL {
let decision = self.stage_decision(workload, stage, backend, benchmarks, true);
unresolved |= decision.requires_rca();
stages.push(decision);
}
if unresolved {
for decision in &mut stages {
decision.selected_backend = BackendKind::Cpu;
}
return J2kAdaptiveRouteReport {
request,
route_kind: J2kAdaptiveRouteKind::CpuOnly,
selected_device: None,
stages,
};
}
let has_device_stage = stages
.iter()
.any(|decision| decision.selected_backend == backend);
J2kAdaptiveRouteReport {
request,
route_kind: if has_device_stage {
J2kAdaptiveRouteKind::Hybrid
} else {
J2kAdaptiveRouteKind::CpuOnly
},
selected_device: has_device_stage.then_some(backend),
stages,
}
}
fn gated_cpu_report(
&self,
workload: J2kAdaptiveWorkload,
request: J2kAdaptiveBackendRequest,
backend: Option<BackendKind>,
benchmarks: &J2kAdaptiveBenchmarks,
) -> J2kAdaptiveRouteReport {
let stages = J2kAdaptiveStage::ALL
.into_iter()
.map(|stage| {
let mut decision = backend.map_or_else(
|| {
let logical_owner = workload.logical_owner_for(stage);
J2kAdaptiveStageDecision {
stage,
logical_owner,
selected_backend: BackendKind::Cpu,
gate_status: if logical_owner == J2kAdaptiveStageOwner::Gpu {
J2kAdaptiveStageGateStatus::BenchmarkGateMissing
} else {
J2kAdaptiveStageGateStatus::CpuShaped
},
improvement_percent: None,
rca_reason: None,
}
},
|backend| {
let end_to_end_passed = benchmarks
.end_to_end_for(backend)
.is_some_and(|evidence| evidence.passes(self.policy));
self.stage_decision(workload, stage, backend, benchmarks, end_to_end_passed)
},
);
decision.selected_backend = BackendKind::Cpu;
decision
})
.collect();
J2kAdaptiveRouteReport {
request,
route_kind: J2kAdaptiveRouteKind::CpuOnly,
selected_device: None,
stages,
}
}
fn stage_decision(
&self,
workload: J2kAdaptiveWorkload,
stage: J2kAdaptiveStage,
backend: BackendKind,
benchmarks: &J2kAdaptiveBenchmarks,
end_to_end_passed: bool,
) -> J2kAdaptiveStageDecision {
let logical_owner = workload.logical_owner_for(stage);
match logical_owner {
J2kAdaptiveStageOwner::Cpu => J2kAdaptiveStageDecision {
stage,
logical_owner,
selected_backend: BackendKind::Cpu,
gate_status: J2kAdaptiveStageGateStatus::CpuShaped,
improvement_percent: None,
rca_reason: None,
},
J2kAdaptiveStageOwner::Variable => {
let evidence = benchmarks.stage_for(stage, backend);
let approved = end_to_end_passed
&& evidence.is_some_and(|evidence| evidence.passes(self.policy));
J2kAdaptiveStageDecision {
stage,
logical_owner,
selected_backend: if approved { backend } else { BackendKind::Cpu },
gate_status: if approved {
J2kAdaptiveStageGateStatus::Approved
} else {
J2kAdaptiveStageGateStatus::VariableCpuDefault
},
improvement_percent: evidence
.map(J2kAdaptiveBenchmarkEvidence::improvement_percent),
rca_reason: None,
}
}
J2kAdaptiveStageOwner::Gpu => {
if let Some(finding) = self.rca_for(stage, backend) {
return J2kAdaptiveStageDecision {
stage,
logical_owner,
selected_backend: BackendKind::Cpu,
gate_status: J2kAdaptiveStageGateStatus::ReclassifiedCpu,
improvement_percent: benchmarks
.stage_for(stage, backend)
.map(J2kAdaptiveBenchmarkEvidence::improvement_percent),
rca_reason: Some(finding.reason),
};
}
let evidence = benchmarks.stage_for(stage, backend);
let gate_status = match (end_to_end_passed, evidence) {
(false, _) => J2kAdaptiveStageGateStatus::EndToEndGateBlocked,
(true, None) => J2kAdaptiveStageGateStatus::BenchmarkGateMissing,
(true, Some(evidence)) if evidence.passes(self.policy) => {
J2kAdaptiveStageGateStatus::Approved
}
(true, Some(_)) => J2kAdaptiveStageGateStatus::BlockedNeedsRca,
};
J2kAdaptiveStageDecision {
stage,
logical_owner,
selected_backend: if gate_status == J2kAdaptiveStageGateStatus::Approved {
backend
} else {
BackendKind::Cpu
},
gate_status,
improvement_percent: evidence
.map(J2kAdaptiveBenchmarkEvidence::improvement_percent),
rca_reason: None,
}
}
}
}
fn best_approved_device(
&self,
workload: J2kAdaptiveWorkload,
benchmarks: &J2kAdaptiveBenchmarks,
) -> Option<BackendKind> {
[BackendKind::Metal, BackendKind::Cuda]
.into_iter()
.filter(|backend| self.supports_backend(*backend))
.filter_map(|backend| {
benchmarks
.end_to_end_for(backend)
.filter(|evidence| evidence.passes(self.policy))
.map(|evidence| (backend, evidence.accelerated_ns))
})
.filter(|(backend, _)| {
J2kAdaptiveStage::ALL.into_iter().all(|stage| {
!self
.stage_decision(workload, stage, *backend, benchmarks, true)
.requires_rca()
})
})
.min_by_key(|(_, accelerated_ns)| *accelerated_ns)
.map(|(backend, _)| backend)
}
fn best_candidate_device(&self, benchmarks: &J2kAdaptiveBenchmarks) -> Option<BackendKind> {
[BackendKind::Metal, BackendKind::Cuda]
.into_iter()
.filter(|backend| self.supports_backend(*backend))
.filter(|backend| benchmarks.has_evidence_for(*backend))
.min_by_key(|backend| {
benchmarks
.best_observed_ns_for(*backend)
.unwrap_or(u64::MAX)
})
}
fn supports_backend(&self, backend: BackendKind) -> bool {
match backend {
BackendKind::Cpu => true,
BackendKind::Metal => self.capabilities.metal,
BackendKind::Cuda => self.capabilities.cuda,
}
}
fn rca_for(
&self,
stage: J2kAdaptiveStage,
backend: BackendKind,
) -> Option<J2kAdaptiveRcaFinding> {
self.rca_findings.iter().copied().find(|finding| {
finding.stage == stage && finding.backend == backend && finding.reclassify_cpu
})
}
}