Skip to main content

j2k_transcode/
pipeline_map.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Stage-level residency report for JPEG-to-HTJ2K transcode timings.
4
5use core::fmt::{self, Write as _};
6
7use crate::{BatchTranscodeReport, TranscodeReport, TranscodeTimingReport};
8
9/// Logical stages in the JPEG-to-J2K/HTJ2K transcode pipeline.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum TranscodePipelineStageKind {
12    /// JPEG marker parsing, entropy decode, and DCT coefficient extraction.
13    EntropyDecode,
14    /// Coefficient repacking and host/device input preparation.
15    CoefficientPrep,
16    /// DCT-grid to wavelet-domain transform.
17    Transform,
18    /// Quantization, code-block layout, and pre-packet code-block work.
19    QuantizationCodeBlockPrep,
20    /// Packet header and packet body formation.
21    Packetization,
22    /// Final marker, tile-part, and codestream byte assembly.
23    CodestreamAssembly,
24}
25
26impl TranscodePipelineStageKind {
27    /// Stable snake-case label used by debug reports and logs.
28    #[must_use]
29    pub const fn as_str(self) -> &'static str {
30        match self {
31            Self::EntropyDecode => "entropy_decode",
32            Self::CoefficientPrep => "coefficient_prep",
33            Self::Transform => "transform",
34            Self::QuantizationCodeBlockPrep => "quantization_code_block_prep",
35            Self::Packetization => "packetization",
36            Self::CodestreamAssembly => "codestream_assembly",
37        }
38    }
39}
40
41impl fmt::Display for TranscodePipelineStageKind {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        f.write_str(self.as_str())
44    }
45}
46
47/// Observed residency for a transcode stage.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum TranscodeStageProcessor {
50    /// Work is currently observed at CPU/native Rust or native encoder boundaries.
51    Cpu,
52    /// Work is observed through the Metal/accelerator stage counters.
53    Metal,
54    /// Existing counters show both CPU and Metal/accelerator work for this stage.
55    Hybrid,
56}
57
58impl TranscodeStageProcessor {
59    /// Stable label used by debug reports and logs.
60    #[must_use]
61    pub const fn as_str(self) -> &'static str {
62        match self {
63            Self::Cpu => "Cpu",
64            Self::Metal => "Metal",
65            Self::Hybrid => "Hybrid",
66        }
67    }
68}
69
70impl fmt::Display for TranscodeStageProcessor {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        f.write_str(self.as_str())
73    }
74}
75
76/// One stage in a transcode pipeline residency map.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub struct TranscodePipelineStageReport {
79    /// Logical transcode stage.
80    pub stage: TranscodePipelineStageKind,
81    /// Observed CPU/Metal residency for this stage.
82    pub processor: TranscodeStageProcessor,
83    /// CPU/native time currently visible for this stage, in microseconds.
84    pub cpu_us: u128,
85    /// Metal/accelerator time currently visible for this stage, in microseconds.
86    pub metal_us: u128,
87    /// Host/device transfer time visible for this stage, in microseconds.
88    pub transfer_us: u128,
89    /// Dispatches observed for this stage.
90    pub dispatches: usize,
91    /// Component jobs that used CPU fallback at this stage.
92    pub fallback_jobs: usize,
93    /// Short interpretation of the counters behind this stage.
94    pub note: &'static str,
95}
96
97/// Recommended next stage to evaluate for Metal residency.
98#[derive(Debug, Clone, Copy, PartialEq, Eq)]
99pub struct TranscodeResidentStageRecommendation {
100    /// Stage that should be evaluated next.
101    pub stage: TranscodePipelineStageKind,
102    /// Existing measured time supporting the recommendation, in microseconds.
103    pub evidence_us: u128,
104    /// Existing dispatch count supporting the recommendation.
105    pub evidence_dispatches: usize,
106    /// Why this stage is the next candidate.
107    pub reason: &'static str,
108}
109
110/// Stage-by-stage transcode residency map derived from existing timings.
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub struct TranscodePipelineMap {
113    /// Ordered stage reports from JPEG input through codestream output.
114    pub stages: Vec<TranscodePipelineStageReport>,
115    /// Next resident-stage candidate derived from the observed counters.
116    pub recommendation: TranscodeResidentStageRecommendation,
117}
118
119impl TranscodePipelineMap {
120    /// Build a pipeline map from an existing timing report.
121    #[must_use]
122    pub fn from_timings(timings: &TranscodeTimingReport) -> Self {
123        Self {
124            stages: vec![
125                entropy_decode_stage(timings),
126                coefficient_prep_stage(timings),
127                transform_stage(timings),
128                quantization_code_block_stage(timings),
129                packetization_stage(timings),
130                codestream_assembly_stage(timings),
131            ],
132            recommendation: recommend_next_resident_stage(timings),
133        }
134    }
135
136    /// Render a compact, line-oriented report for benchmark/debug output.
137    #[must_use]
138    pub fn debug_report(&self) -> String {
139        let mut output = String::from("jpeg_to_htj2k_pipeline_map\n");
140        for stage in &self.stages {
141            writeln!(
142                output,
143                "stage={} processor={} cpu_us={} metal_us={} transfer_us={} dispatches={} fallback_jobs={} note={}",
144                stage.stage,
145                stage.processor,
146                stage.cpu_us,
147                stage.metal_us,
148                stage.transfer_us,
149                stage.dispatches,
150                stage.fallback_jobs,
151                stage.note,
152            )
153            .expect("writing a transcode pipeline debug report to a String cannot fail");
154        }
155        writeln!(
156            output,
157            "recommend_next_stage={} evidence_us={} evidence_dispatches={} reason={}",
158            self.recommendation.stage,
159            self.recommendation.evidence_us,
160            self.recommendation.evidence_dispatches,
161            self.recommendation.reason,
162        )
163        .expect("writing a transcode pipeline recommendation to a String cannot fail");
164        output
165    }
166}
167
168impl TranscodeTimingReport {
169    /// Convert this timing report into a CPU/Metal transcode pipeline map.
170    #[must_use]
171    pub fn pipeline_map(&self) -> TranscodePipelineMap {
172        TranscodePipelineMap::from_timings(self)
173    }
174}
175
176impl TranscodeReport {
177    /// Convert this transcode report into a CPU/Metal pipeline map.
178    #[must_use]
179    pub fn pipeline_map(&self) -> TranscodePipelineMap {
180        self.timings.pipeline_map()
181    }
182}
183
184impl BatchTranscodeReport {
185    /// Convert this batch transcode report into a CPU/Metal pipeline map.
186    #[must_use]
187    pub fn pipeline_map(&self) -> TranscodePipelineMap {
188        self.timings.pipeline_map()
189    }
190}
191
192fn entropy_decode_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
193    TranscodePipelineStageReport {
194        stage: TranscodePipelineStageKind::EntropyDecode,
195        processor: TranscodeStageProcessor::Cpu,
196        cpu_us: timings.jpeg_dct_extract_us,
197        metal_us: 0,
198        transfer_us: 0,
199        dispatches: 0,
200        fallback_jobs: 0,
201        note: "JPEG marker parsing, entropy decode, dequantization, and DCT coefficient extraction stay on CPU",
202    }
203}
204
205fn coefficient_prep_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
206    let transfer_us = timings.dwt97_batch_pack_upload_us;
207    TranscodePipelineStageReport {
208        stage: TranscodePipelineStageKind::CoefficientPrep,
209        processor: processor_for(
210            timings.jpeg_dct_repack_us,
211            0,
212            transfer_us,
213            usize::from(transfer_us > 0),
214            0,
215        ),
216        cpu_us: timings.jpeg_dct_repack_us,
217        metal_us: 0,
218        transfer_us,
219        dispatches: usize::from(transfer_us > 0),
220        fallback_jobs: 0,
221        note: "DCT coefficient repack and Metal buffer pack/upload are visible before transform dispatch",
222    }
223}
224
225fn transform_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
226    let dwt97_kernel_us = timings
227        .dwt97_batch_idct_row_lift_us
228        .saturating_add(timings.dwt97_batch_column_lift_us);
229    let metal_us = if dwt97_kernel_us > 0 {
230        dwt97_kernel_us
231    } else if timings.accelerator_dispatches > 0 {
232        timings.dct_to_wavelet_accelerator_us
233    } else {
234        0
235    };
236    let cpu_us = timings
237        .dct_to_wavelet_cpu_fallback_us
238        .saturating_add(timings.dwt_decompose_us);
239    TranscodePipelineStageReport {
240        stage: TranscodePipelineStageKind::Transform,
241        processor: processor_for(
242            cpu_us,
243            metal_us,
244            timings.dwt97_batch_readback_us,
245            timings.accelerator_dispatches,
246            timings.cpu_fallback_jobs,
247        ),
248        cpu_us,
249        metal_us,
250        transfer_us: timings.dwt97_batch_readback_us,
251        dispatches: timings.accelerator_dispatches,
252        fallback_jobs: timings.cpu_fallback_jobs,
253        note: "DCT-grid to DWT projection uses accelerator dispatches when available; Ok(None) jobs remain caller CPU fallback",
254    }
255}
256
257fn quantization_code_block_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
258    let metal_us = timings
259        .dwt97_batch_quantize_codeblock_us
260        .saturating_add(timings.dwt97_batch_ht_encode_us)
261        .saturating_add(timings.dwt97_batch_ht_kernel_us)
262        .saturating_add(timings.dwt97_batch_ht_compact_us);
263    let transfer_us = timings
264        .dwt97_batch_ht_status_readback_us
265        .saturating_add(timings.dwt97_batch_ht_output_readback_us);
266    let dispatches = timings
267        .dwt97_batch_ht_codeblock_dispatches
268        .saturating_add(timings.htj2k_encode_ht_code_block_dispatches);
269    TranscodePipelineStageReport {
270        stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
271        processor: processor_for(0, metal_us, transfer_us, dispatches, 0),
272        cpu_us: 0,
273        metal_us,
274        transfer_us,
275        dispatches,
276        fallback_jobs: 0,
277        note: "9/7 quantization/code-block layout is only isolated when a backend reports resident stage timings; otherwise it is inside native encode time",
278    }
279}
280
281fn packetization_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
282    let dispatches = timings.htj2k_encode_packetization_dispatches;
283    TranscodePipelineStageReport {
284        stage: TranscodePipelineStageKind::Packetization,
285        processor: processor_for(0, 0, 0, dispatches, 0),
286        cpu_us: 0,
287        metal_us: 0,
288        transfer_us: 0,
289        dispatches,
290        fallback_jobs: 0,
291        note: "Packetization dispatches are counted separately when an encode-stage accelerator handles them; CPU time is otherwise inside native encode time",
292    }
293}
294
295fn codestream_assembly_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
296    TranscodePipelineStageReport {
297        stage: TranscodePipelineStageKind::CodestreamAssembly,
298        processor: TranscodeStageProcessor::Cpu,
299        cpu_us: timings.htj2k_encode_us,
300        metal_us: 0,
301        transfer_us: 0,
302        dispatches: 0,
303        fallback_jobs: 0,
304        note: "Final marker, tile-part, packet byte ordering, and codestream assembly remain at the CPU/native encoder boundary",
305    }
306}
307
308fn recommend_next_resident_stage(
309    timings: &TranscodeTimingReport,
310) -> TranscodeResidentStageRecommendation {
311    let transform_readback_us = timings.dwt97_batch_readback_us;
312    let code_block_readback_us = timings
313        .dwt97_batch_ht_status_readback_us
314        .saturating_add(timings.dwt97_batch_ht_output_readback_us);
315    let has_resident_transform = timings.accelerator_dispatches > 0
316        && (timings.dwt97_batch_idct_row_lift_us > 0
317            || timings.dwt97_batch_column_lift_us > 0
318            || timings.dct_to_wavelet_accelerator_us > 0);
319
320    if timings.cpu_fallback_jobs > 0 && timings.accelerator_dispatches == 0 {
321        return TranscodeResidentStageRecommendation {
322            stage: TranscodePipelineStageKind::Transform,
323            evidence_us: timings.dct_to_wavelet_cpu_fallback_us,
324            evidence_dispatches: timings.accelerator_attempts,
325            reason: "transform jobs are still completing through caller CPU fallback",
326        };
327    }
328
329    if has_resident_transform && timings.dwt97_batch_quantize_codeblock_us == 0 {
330        return TranscodeResidentStageRecommendation {
331            stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
332            evidence_us: transform_readback_us.saturating_add(timings.htj2k_encode_us),
333            evidence_dispatches: timings.accelerator_dispatches,
334            reason: "resident transform output is read back before quantization/code-block prep and native encode",
335        };
336    }
337
338    if timings.dwt97_batch_quantize_codeblock_us > 0
339        && timings.dwt97_batch_ht_codeblock_dispatches == 0
340    {
341        return TranscodeResidentStageRecommendation {
342            stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
343            evidence_us: transform_readback_us
344                .saturating_add(code_block_readback_us)
345                .saturating_add(timings.htj2k_encode_us),
346            evidence_dispatches: timings.accelerator_dispatches,
347            reason: "Metal already reaches 9/7 code-block prep; extend residency through HT code-block encode before packetization",
348        };
349    }
350
351    if timings.cpu_fallback_jobs > 0 {
352        return TranscodeResidentStageRecommendation {
353            stage: TranscodePipelineStageKind::Transform,
354            evidence_us: timings.dct_to_wavelet_cpu_fallback_us,
355            evidence_dispatches: timings.accelerator_attempts,
356            reason: "some transform jobs still use CPU fallback after accelerator attempts",
357        };
358    }
359
360    let coefficient_prep_us = timings
361        .jpeg_dct_repack_us
362        .saturating_add(timings.dwt97_batch_pack_upload_us);
363    if coefficient_prep_us > 0 {
364        return TranscodeResidentStageRecommendation {
365            stage: TranscodePipelineStageKind::CoefficientPrep,
366            evidence_us: coefficient_prep_us,
367            evidence_dispatches: timings.accelerator_dispatches,
368            reason:
369                "coefficient repack and host-to-device upload are visible before Metal work starts",
370        };
371    }
372
373    if timings.htj2k_encode_packetization_dispatches == 0 && timings.htj2k_encode_us > 0 {
374        return TranscodeResidentStageRecommendation {
375            stage: TranscodePipelineStageKind::Packetization,
376            evidence_us: timings.htj2k_encode_us,
377            evidence_dispatches: timings.htj2k_encode_accelerator_dispatches,
378            reason:
379                "packetization and codestream assembly remain inside the CPU/native encode boundary",
380        };
381    }
382
383    TranscodeResidentStageRecommendation {
384        stage: TranscodePipelineStageKind::CodestreamAssembly,
385        evidence_us: timings.htj2k_encode_us,
386        evidence_dispatches: timings.htj2k_encode_accelerator_dispatches,
387        reason: "no stronger resident-stage candidate is visible from the current counters",
388    }
389}
390
391fn processor_for(
392    cpu_us: u128,
393    metal_us: u128,
394    transfer_us: u128,
395    dispatches: usize,
396    fallback_jobs: usize,
397) -> TranscodeStageProcessor {
398    let has_cpu = cpu_us > 0 || fallback_jobs > 0;
399    let has_metal = metal_us > 0 || transfer_us > 0 || dispatches > 0;
400    match (has_cpu, has_metal) {
401        (true, true) => TranscodeStageProcessor::Hybrid,
402        (false, true) => TranscodeStageProcessor::Metal,
403        (_, false) => TranscodeStageProcessor::Cpu,
404    }
405}