use core::fmt::{self, Write as _};
use crate::{BatchTranscodeReport, TranscodeReport, TranscodeTimingReport};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TranscodePipelineStageKind {
EntropyDecode,
CoefficientPrep,
Transform,
QuantizationCodeBlockPrep,
Packetization,
CodestreamAssembly,
}
impl TranscodePipelineStageKind {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::EntropyDecode => "entropy_decode",
Self::CoefficientPrep => "coefficient_prep",
Self::Transform => "transform",
Self::QuantizationCodeBlockPrep => "quantization_code_block_prep",
Self::Packetization => "packetization",
Self::CodestreamAssembly => "codestream_assembly",
}
}
}
impl fmt::Display for TranscodePipelineStageKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TranscodeStageProcessor {
Cpu,
Metal,
Hybrid,
}
impl TranscodeStageProcessor {
#[must_use]
pub const fn as_str(self) -> &'static str {
match self {
Self::Cpu => "Cpu",
Self::Metal => "Metal",
Self::Hybrid => "Hybrid",
}
}
}
impl fmt::Display for TranscodeStageProcessor {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TranscodePipelineStageReport {
pub stage: TranscodePipelineStageKind,
pub processor: TranscodeStageProcessor,
pub cpu_us: u128,
pub metal_us: u128,
pub transfer_us: u128,
pub dispatches: usize,
pub fallback_jobs: usize,
pub note: &'static str,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct TranscodeResidentStageRecommendation {
pub stage: TranscodePipelineStageKind,
pub evidence_us: u128,
pub evidence_dispatches: usize,
pub reason: &'static str,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TranscodePipelineMap {
pub stages: Vec<TranscodePipelineStageReport>,
pub recommendation: TranscodeResidentStageRecommendation,
}
impl TranscodePipelineMap {
#[must_use]
pub fn from_timings(timings: &TranscodeTimingReport) -> Self {
Self {
stages: vec![
entropy_decode_stage(timings),
coefficient_prep_stage(timings),
transform_stage(timings),
quantization_code_block_stage(timings),
packetization_stage(timings),
codestream_assembly_stage(timings),
],
recommendation: recommend_next_resident_stage(timings),
}
}
#[must_use]
pub fn debug_report(&self) -> String {
let mut output = String::from("jpeg_to_htj2k_pipeline_map\n");
for stage in &self.stages {
writeln!(
output,
"stage={} processor={} cpu_us={} metal_us={} transfer_us={} dispatches={} fallback_jobs={} note={}",
stage.stage,
stage.processor,
stage.cpu_us,
stage.metal_us,
stage.transfer_us,
stage.dispatches,
stage.fallback_jobs,
stage.note,
)
.expect("writing a transcode pipeline debug report to a String cannot fail");
}
writeln!(
output,
"recommend_next_stage={} evidence_us={} evidence_dispatches={} reason={}",
self.recommendation.stage,
self.recommendation.evidence_us,
self.recommendation.evidence_dispatches,
self.recommendation.reason,
)
.expect("writing a transcode pipeline recommendation to a String cannot fail");
output
}
}
impl TranscodeTimingReport {
#[must_use]
pub fn pipeline_map(&self) -> TranscodePipelineMap {
TranscodePipelineMap::from_timings(self)
}
}
impl TranscodeReport {
#[must_use]
pub fn pipeline_map(&self) -> TranscodePipelineMap {
self.timings.pipeline_map()
}
}
impl BatchTranscodeReport {
#[must_use]
pub fn pipeline_map(&self) -> TranscodePipelineMap {
self.timings.pipeline_map()
}
}
fn entropy_decode_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
TranscodePipelineStageReport {
stage: TranscodePipelineStageKind::EntropyDecode,
processor: TranscodeStageProcessor::Cpu,
cpu_us: timings.jpeg_dct_extract_us,
metal_us: 0,
transfer_us: 0,
dispatches: 0,
fallback_jobs: 0,
note: "JPEG marker parsing, entropy decode, dequantization, and DCT coefficient extraction stay on CPU",
}
}
fn coefficient_prep_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
let transfer_us = timings.dwt97_batch_pack_upload_us;
TranscodePipelineStageReport {
stage: TranscodePipelineStageKind::CoefficientPrep,
processor: processor_for(
timings.jpeg_dct_repack_us,
0,
transfer_us,
usize::from(transfer_us > 0),
0,
),
cpu_us: timings.jpeg_dct_repack_us,
metal_us: 0,
transfer_us,
dispatches: usize::from(transfer_us > 0),
fallback_jobs: 0,
note: "DCT coefficient repack and Metal buffer pack/upload are visible before transform dispatch",
}
}
fn transform_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
let dwt97_kernel_us = timings
.dwt97_batch_idct_row_lift_us
.saturating_add(timings.dwt97_batch_column_lift_us);
let metal_us = if dwt97_kernel_us > 0 {
dwt97_kernel_us
} else if timings.accelerator_dispatches > 0 {
timings.dct_to_wavelet_accelerator_us
} else {
0
};
let cpu_us = timings
.dct_to_wavelet_cpu_fallback_us
.saturating_add(timings.dwt_decompose_us);
TranscodePipelineStageReport {
stage: TranscodePipelineStageKind::Transform,
processor: processor_for(
cpu_us,
metal_us,
timings.dwt97_batch_readback_us,
timings.accelerator_dispatches,
timings.cpu_fallback_jobs,
),
cpu_us,
metal_us,
transfer_us: timings.dwt97_batch_readback_us,
dispatches: timings.accelerator_dispatches,
fallback_jobs: timings.cpu_fallback_jobs,
note: "DCT-grid to DWT projection uses accelerator dispatches when available; Ok(None) jobs remain caller CPU fallback",
}
}
fn quantization_code_block_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
let metal_us = timings
.dwt97_batch_quantize_codeblock_us
.saturating_add(timings.dwt97_batch_ht_encode_us)
.saturating_add(timings.dwt97_batch_ht_kernel_us)
.saturating_add(timings.dwt97_batch_ht_compact_us);
let transfer_us = timings
.dwt97_batch_ht_status_readback_us
.saturating_add(timings.dwt97_batch_ht_output_readback_us);
let dispatches = timings
.dwt97_batch_ht_codeblock_dispatches
.saturating_add(timings.htj2k_encode_ht_code_block_dispatches);
TranscodePipelineStageReport {
stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
processor: processor_for(0, metal_us, transfer_us, dispatches, 0),
cpu_us: 0,
metal_us,
transfer_us,
dispatches,
fallback_jobs: 0,
note: "9/7 quantization/code-block layout is only isolated when a backend reports resident stage timings; otherwise it is inside native encode time",
}
}
fn packetization_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
let dispatches = timings.htj2k_encode_packetization_dispatches;
TranscodePipelineStageReport {
stage: TranscodePipelineStageKind::Packetization,
processor: processor_for(0, 0, 0, dispatches, 0),
cpu_us: 0,
metal_us: 0,
transfer_us: 0,
dispatches,
fallback_jobs: 0,
note: "Packetization dispatches are counted separately when an encode-stage accelerator handles them; CPU time is otherwise inside native encode time",
}
}
fn codestream_assembly_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
TranscodePipelineStageReport {
stage: TranscodePipelineStageKind::CodestreamAssembly,
processor: TranscodeStageProcessor::Cpu,
cpu_us: timings.htj2k_encode_us,
metal_us: 0,
transfer_us: 0,
dispatches: 0,
fallback_jobs: 0,
note: "Final marker, tile-part, packet byte ordering, and codestream assembly remain at the CPU/native encoder boundary",
}
}
fn recommend_next_resident_stage(
timings: &TranscodeTimingReport,
) -> TranscodeResidentStageRecommendation {
let transform_readback_us = timings.dwt97_batch_readback_us;
let code_block_readback_us = timings
.dwt97_batch_ht_status_readback_us
.saturating_add(timings.dwt97_batch_ht_output_readback_us);
let has_resident_transform = timings.accelerator_dispatches > 0
&& (timings.dwt97_batch_idct_row_lift_us > 0
|| timings.dwt97_batch_column_lift_us > 0
|| timings.dct_to_wavelet_accelerator_us > 0);
if timings.cpu_fallback_jobs > 0 && timings.accelerator_dispatches == 0 {
return TranscodeResidentStageRecommendation {
stage: TranscodePipelineStageKind::Transform,
evidence_us: timings.dct_to_wavelet_cpu_fallback_us,
evidence_dispatches: timings.accelerator_attempts,
reason: "transform jobs are still completing through caller CPU fallback",
};
}
if has_resident_transform && timings.dwt97_batch_quantize_codeblock_us == 0 {
return TranscodeResidentStageRecommendation {
stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
evidence_us: transform_readback_us.saturating_add(timings.htj2k_encode_us),
evidence_dispatches: timings.accelerator_dispatches,
reason: "resident transform output is read back before quantization/code-block prep and native encode",
};
}
if timings.dwt97_batch_quantize_codeblock_us > 0
&& timings.dwt97_batch_ht_codeblock_dispatches == 0
{
return TranscodeResidentStageRecommendation {
stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
evidence_us: transform_readback_us
.saturating_add(code_block_readback_us)
.saturating_add(timings.htj2k_encode_us),
evidence_dispatches: timings.accelerator_dispatches,
reason: "Metal already reaches 9/7 code-block prep; extend residency through HT code-block encode before packetization",
};
}
if timings.cpu_fallback_jobs > 0 {
return TranscodeResidentStageRecommendation {
stage: TranscodePipelineStageKind::Transform,
evidence_us: timings.dct_to_wavelet_cpu_fallback_us,
evidence_dispatches: timings.accelerator_attempts,
reason: "some transform jobs still use CPU fallback after accelerator attempts",
};
}
let coefficient_prep_us = timings
.jpeg_dct_repack_us
.saturating_add(timings.dwt97_batch_pack_upload_us);
if coefficient_prep_us > 0 {
return TranscodeResidentStageRecommendation {
stage: TranscodePipelineStageKind::CoefficientPrep,
evidence_us: coefficient_prep_us,
evidence_dispatches: timings.accelerator_dispatches,
reason:
"coefficient repack and host-to-device upload are visible before Metal work starts",
};
}
if timings.htj2k_encode_packetization_dispatches == 0 && timings.htj2k_encode_us > 0 {
return TranscodeResidentStageRecommendation {
stage: TranscodePipelineStageKind::Packetization,
evidence_us: timings.htj2k_encode_us,
evidence_dispatches: timings.htj2k_encode_accelerator_dispatches,
reason:
"packetization and codestream assembly remain inside the CPU/native encode boundary",
};
}
TranscodeResidentStageRecommendation {
stage: TranscodePipelineStageKind::CodestreamAssembly,
evidence_us: timings.htj2k_encode_us,
evidence_dispatches: timings.htj2k_encode_accelerator_dispatches,
reason: "no stronger resident-stage candidate is visible from the current counters",
}
}
fn processor_for(
cpu_us: u128,
metal_us: u128,
transfer_us: u128,
dispatches: usize,
fallback_jobs: usize,
) -> TranscodeStageProcessor {
let has_cpu = cpu_us > 0 || fallback_jobs > 0;
let has_metal = metal_us > 0 || transfer_us > 0 || dispatches > 0;
match (has_cpu, has_metal) {
(true, true) => TranscodeStageProcessor::Hybrid,
(false, true) => TranscodeStageProcessor::Metal,
(_, false) => TranscodeStageProcessor::Cpu,
}
}