Skip to main content

j2k_transcode/
pipeline_map.rs

1// SPDX-License-Identifier: MIT OR Apache-2.0
2
3//! Stage-level residency report for JPEG-to-HTJ2K transcode timings.
4
5use core::fmt::{self, Write as _};
6
7use crate::{BatchTranscodeReport, TranscodeReport, TranscodeTimingReport};
8
9/// Logical stages in the JPEG-to-J2K/HTJ2K transcode pipeline.
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum TranscodePipelineStageKind {
12    /// JPEG marker parsing, entropy decode, and DCT coefficient extraction.
13    EntropyDecode,
14    /// Coefficient repacking and host/device input preparation.
15    CoefficientPrep,
16    /// DCT-grid to wavelet-domain transform.
17    Transform,
18    /// Quantization, code-block layout, and pre-packet code-block work.
19    QuantizationCodeBlockPrep,
20    /// Packet header and packet body formation.
21    Packetization,
22    /// Final marker, tile-part, and codestream byte assembly.
23    CodestreamAssembly,
24}
25
26impl TranscodePipelineStageKind {
27    /// Stable snake-case label used by debug reports and logs.
28    #[must_use]
29    pub const fn as_str(self) -> &'static str {
30        match self {
31            Self::EntropyDecode => "entropy_decode",
32            Self::CoefficientPrep => "coefficient_prep",
33            Self::Transform => "transform",
34            Self::QuantizationCodeBlockPrep => "quantization_code_block_prep",
35            Self::Packetization => "packetization",
36            Self::CodestreamAssembly => "codestream_assembly",
37        }
38    }
39}
40
41impl fmt::Display for TranscodePipelineStageKind {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        f.write_str(self.as_str())
44    }
45}
46
47/// Observed residency for a transcode stage.
48#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum TranscodeStageProcessor {
50    /// Work is currently observed at CPU/native Rust or native encoder boundaries.
51    Cpu,
52    /// Work is observed through the Metal/accelerator stage counters.
53    Metal,
54    /// Existing counters show both CPU and Metal/accelerator work for this stage.
55    Hybrid,
56}
57
58impl TranscodeStageProcessor {
59    /// Stable label used by debug reports and logs.
60    #[must_use]
61    pub const fn as_str(self) -> &'static str {
62        match self {
63            Self::Cpu => "Cpu",
64            Self::Metal => "Metal",
65            Self::Hybrid => "Hybrid",
66        }
67    }
68}
69
70impl fmt::Display for TranscodeStageProcessor {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        f.write_str(self.as_str())
73    }
74}
75
76/// One stage in a transcode pipeline residency map.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub struct TranscodePipelineStageReport {
79    /// Logical transcode stage.
80    pub stage: TranscodePipelineStageKind,
81    /// Observed CPU/Metal residency for this stage.
82    pub processor: TranscodeStageProcessor,
83    /// CPU/native time currently visible for this stage, in microseconds.
84    pub cpu_us: u128,
85    /// Metal/accelerator time currently visible for this stage, in microseconds.
86    pub metal_us: u128,
87    /// Host/device transfer time visible for this stage, in microseconds.
88    pub transfer_us: u128,
89    /// Logical host/device transfer operations visible for this stage.
90    pub transfer_count: usize,
91    /// Host/device transfer bytes visible for this stage.
92    pub transfer_bytes: u64,
93    /// Validated resident handoff descriptors visible for this stage.
94    pub resident_handoff_count: usize,
95    /// Dispatches observed for this stage.
96    pub dispatches: usize,
97    /// Component jobs that used CPU fallback at this stage.
98    pub fallback_jobs: usize,
99    /// Short interpretation of the counters behind this stage.
100    pub note: &'static str,
101}
102
103/// Recommended next stage to evaluate for Metal residency.
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub struct TranscodeResidentStageRecommendation {
106    /// Stage that should be evaluated next.
107    pub stage: TranscodePipelineStageKind,
108    /// Existing measured time supporting the recommendation, in microseconds.
109    pub evidence_us: u128,
110    /// Existing dispatch count supporting the recommendation.
111    pub evidence_dispatches: usize,
112    /// Why this stage is the next candidate.
113    pub reason: &'static str,
114}
115
116/// Stage-by-stage transcode residency map derived from existing timings.
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct TranscodePipelineMap {
119    /// Ordered stage reports from JPEG input through codestream output.
120    pub stages: Vec<TranscodePipelineStageReport>,
121    /// Next resident-stage candidate derived from the observed counters.
122    pub recommendation: TranscodeResidentStageRecommendation,
123}
124
125impl TranscodePipelineMap {
126    /// Build a pipeline map from an existing timing report.
127    #[must_use]
128    pub fn from_timings(timings: &TranscodeTimingReport) -> Self {
129        Self {
130            stages: vec![
131                entropy_decode_stage(timings),
132                coefficient_prep_stage(timings),
133                transform_stage(timings),
134                quantization_code_block_stage(timings),
135                packetization_stage(timings),
136                codestream_assembly_stage(timings),
137            ],
138            recommendation: recommend_next_resident_stage(timings),
139        }
140    }
141
142    /// Render a compact, line-oriented report for benchmark/debug output.
143    #[must_use]
144    pub fn debug_report(&self) -> String {
145        let mut output = String::from("jpeg_to_htj2k_pipeline_map\n");
146        for stage in &self.stages {
147            writeln!(
148                output,
149                "stage={} processor={} cpu_us={} metal_us={} transfer_us={} transfer_count={} transfer_bytes={} resident_handoffs={} dispatches={} fallback_jobs={} note={}",
150                stage.stage,
151                stage.processor,
152                stage.cpu_us,
153                stage.metal_us,
154                stage.transfer_us,
155                stage.transfer_count,
156                stage.transfer_bytes,
157                stage.resident_handoff_count,
158                stage.dispatches,
159                stage.fallback_jobs,
160                stage.note,
161            )
162            .expect("writing a transcode pipeline debug report to a String cannot fail");
163        }
164        writeln!(
165            output,
166            "recommend_next_stage={} evidence_us={} evidence_dispatches={} reason={}",
167            self.recommendation.stage,
168            self.recommendation.evidence_us,
169            self.recommendation.evidence_dispatches,
170            self.recommendation.reason,
171        )
172        .expect("writing a transcode pipeline recommendation to a String cannot fail");
173        output
174    }
175}
176
177impl TranscodeTimingReport {
178    /// Convert this timing report into a CPU/Metal transcode pipeline map.
179    #[must_use]
180    pub fn pipeline_map(&self) -> TranscodePipelineMap {
181        TranscodePipelineMap::from_timings(self)
182    }
183}
184
185impl TranscodeReport {
186    /// Convert this transcode report into a CPU/Metal pipeline map.
187    #[must_use]
188    pub fn pipeline_map(&self) -> TranscodePipelineMap {
189        self.timings.pipeline_map()
190    }
191}
192
193impl BatchTranscodeReport {
194    /// Convert this batch transcode report into a CPU/Metal pipeline map.
195    #[must_use]
196    pub fn pipeline_map(&self) -> TranscodePipelineMap {
197        self.timings.pipeline_map()
198    }
199}
200
201fn entropy_decode_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
202    TranscodePipelineStageReport {
203        stage: TranscodePipelineStageKind::EntropyDecode,
204        processor: TranscodeStageProcessor::Cpu,
205        cpu_us: timings.jpeg_dct_extract_us,
206        metal_us: 0,
207        transfer_us: 0,
208        transfer_count: 0,
209        transfer_bytes: 0,
210        resident_handoff_count: 0,
211        dispatches: 0,
212        fallback_jobs: 0,
213        note: "JPEG marker parsing, entropy decode, dequantization, and DCT coefficient extraction stay on CPU",
214    }
215}
216
217fn coefficient_prep_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
218    let transfer_us = timings.dwt97_batch_pack_upload_us;
219    let transfer_count = transfer_count_or_timing(
220        timings.dwt97_batch_pack_upload_transfers,
221        transfer_us,
222        timings.dwt97_batch_pack_upload_bytes,
223    );
224    let transfer_dispatch = usize::from(
225        transfer_us > 0 || transfer_count > 0 || timings.dwt97_batch_pack_upload_bytes > 0,
226    );
227    TranscodePipelineStageReport {
228        stage: TranscodePipelineStageKind::CoefficientPrep,
229        processor: processor_for(
230            timings.jpeg_dct_repack_us,
231            0,
232            transfer_us,
233            transfer_dispatch,
234            0,
235        ),
236        cpu_us: timings.jpeg_dct_repack_us,
237        metal_us: 0,
238        transfer_us,
239        transfer_count,
240        transfer_bytes: timings.dwt97_batch_pack_upload_bytes,
241        resident_handoff_count: timings.dwt97_batch_resident_dct_handoff_count,
242        dispatches: transfer_dispatch,
243        fallback_jobs: 0,
244        note: "DCT coefficient repack and Metal buffer pack/upload are visible before transform dispatch",
245    }
246}
247
248fn transform_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
249    let dwt97_kernel_us = timings
250        .dwt97_batch_idct_row_lift_us
251        .saturating_add(timings.dwt97_batch_column_lift_us);
252    let metal_us = if dwt97_kernel_us > 0 {
253        dwt97_kernel_us
254    } else if timings.accelerator_dispatches > 0 {
255        timings.dct_to_wavelet_accelerator_us
256    } else {
257        0
258    };
259    let cpu_us = timings
260        .dct_to_wavelet_cpu_fallback_us
261        .saturating_add(timings.dwt_decompose_us);
262    TranscodePipelineStageReport {
263        stage: TranscodePipelineStageKind::Transform,
264        processor: processor_for(
265            cpu_us,
266            metal_us,
267            timings.dwt97_batch_readback_us,
268            timings.accelerator_dispatches,
269            timings.cpu_fallback_jobs,
270        ),
271        cpu_us,
272        metal_us,
273        transfer_us: timings.dwt97_batch_readback_us,
274        transfer_count: transfer_count_or_timing(
275            timings.dwt97_batch_readback_transfers,
276            timings.dwt97_batch_readback_us,
277            timings.dwt97_batch_readback_bytes,
278        ),
279        transfer_bytes: timings.dwt97_batch_readback_bytes,
280        resident_handoff_count: timings.dwt97_batch_resident_dwt_handoff_count,
281        dispatches: timings.accelerator_dispatches,
282        fallback_jobs: timings.cpu_fallback_jobs,
283        note: "DCT-grid to DWT projection uses accelerator dispatches when available; Ok(None) jobs remain caller CPU fallback",
284    }
285}
286
287fn quantization_code_block_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
288    let metal_us = timings
289        .dwt97_batch_quantize_codeblock_us
290        .saturating_add(timings.dwt97_batch_ht_encode_us)
291        .saturating_add(timings.dwt97_batch_ht_kernel_us)
292        .saturating_add(timings.dwt97_batch_ht_compact_us);
293    let transfer_us = timings
294        .dwt97_batch_ht_status_readback_us
295        .saturating_add(timings.dwt97_batch_ht_output_readback_us);
296    let transfer_count = timings
297        .dwt97_batch_ht_status_readback_transfers
298        .saturating_add(timings.dwt97_batch_ht_output_readback_transfers);
299    let transfer_bytes = timings
300        .dwt97_batch_ht_status_readback_bytes
301        .saturating_add(timings.dwt97_batch_ht_output_readback_bytes);
302    let transfer_count = transfer_count_or_timing(transfer_count, transfer_us, transfer_bytes);
303    let dispatches = timings
304        .dwt97_batch_ht_codeblock_dispatches
305        .saturating_add(timings.htj2k_encode_ht_code_block_dispatches);
306    TranscodePipelineStageReport {
307        stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
308        processor: processor_for(0, metal_us, transfer_us, dispatches, 0),
309        cpu_us: 0,
310        metal_us,
311        transfer_us,
312        transfer_count,
313        transfer_bytes,
314        resident_handoff_count: 0,
315        dispatches,
316        fallback_jobs: 0,
317        note: "9/7 quantization/code-block layout is only isolated when a backend reports resident stage timings; otherwise it is inside native encode time",
318    }
319}
320
321fn packetization_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
322    let dispatches = timings.htj2k_encode_packetization_dispatches;
323    TranscodePipelineStageReport {
324        stage: TranscodePipelineStageKind::Packetization,
325        processor: processor_for(0, 0, 0, dispatches, 0),
326        cpu_us: 0,
327        metal_us: 0,
328        transfer_us: 0,
329        transfer_count: 0,
330        transfer_bytes: 0,
331        resident_handoff_count: 0,
332        dispatches,
333        fallback_jobs: 0,
334        note: "Packetization dispatches are counted separately when an encode-stage accelerator handles them; CPU time is otherwise inside native encode time",
335    }
336}
337
338fn codestream_assembly_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
339    TranscodePipelineStageReport {
340        stage: TranscodePipelineStageKind::CodestreamAssembly,
341        processor: TranscodeStageProcessor::Cpu,
342        cpu_us: timings.htj2k_encode_us,
343        metal_us: 0,
344        transfer_us: 0,
345        transfer_count: 0,
346        transfer_bytes: 0,
347        resident_handoff_count: 0,
348        dispatches: 0,
349        fallback_jobs: 0,
350        note: "Final marker, tile-part, packet byte ordering, and codestream assembly remain at the CPU/native encoder boundary",
351    }
352}
353
354fn recommend_next_resident_stage(
355    timings: &TranscodeTimingReport,
356) -> TranscodeResidentStageRecommendation {
357    let transform_readback_us = timings.dwt97_batch_readback_us;
358    let code_block_readback_us = timings
359        .dwt97_batch_ht_status_readback_us
360        .saturating_add(timings.dwt97_batch_ht_output_readback_us);
361    let has_resident_transform = timings.accelerator_dispatches > 0
362        && (timings.dwt97_batch_idct_row_lift_us > 0
363            || timings.dwt97_batch_column_lift_us > 0
364            || timings.dct_to_wavelet_accelerator_us > 0);
365
366    if timings.cpu_fallback_jobs > 0 && timings.accelerator_dispatches == 0 {
367        return TranscodeResidentStageRecommendation {
368            stage: TranscodePipelineStageKind::Transform,
369            evidence_us: timings.dct_to_wavelet_cpu_fallback_us,
370            evidence_dispatches: timings.accelerator_attempts,
371            reason: "transform jobs are still completing through caller CPU fallback",
372        };
373    }
374
375    if has_resident_transform && timings.dwt97_batch_quantize_codeblock_us == 0 {
376        return TranscodeResidentStageRecommendation {
377            stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
378            evidence_us: transform_readback_us.saturating_add(timings.htj2k_encode_us),
379            evidence_dispatches: timings.accelerator_dispatches,
380            reason: "resident transform output is read back before quantization/code-block prep and native encode",
381        };
382    }
383
384    if timings.dwt97_batch_quantize_codeblock_us > 0
385        && timings.dwt97_batch_ht_codeblock_dispatches == 0
386    {
387        return TranscodeResidentStageRecommendation {
388            stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
389            evidence_us: transform_readback_us
390                .saturating_add(code_block_readback_us)
391                .saturating_add(timings.htj2k_encode_us),
392            evidence_dispatches: timings.accelerator_dispatches,
393            reason: "Metal already reaches 9/7 code-block prep; extend residency through HT code-block encode before packetization",
394        };
395    }
396
397    if timings.cpu_fallback_jobs > 0 {
398        return TranscodeResidentStageRecommendation {
399            stage: TranscodePipelineStageKind::Transform,
400            evidence_us: timings.dct_to_wavelet_cpu_fallback_us,
401            evidence_dispatches: timings.accelerator_attempts,
402            reason: "some transform jobs still use CPU fallback after accelerator attempts",
403        };
404    }
405
406    let coefficient_prep_us = timings
407        .jpeg_dct_repack_us
408        .saturating_add(timings.dwt97_batch_pack_upload_us);
409    if coefficient_prep_us > 0 {
410        return TranscodeResidentStageRecommendation {
411            stage: TranscodePipelineStageKind::CoefficientPrep,
412            evidence_us: coefficient_prep_us,
413            evidence_dispatches: timings.accelerator_dispatches,
414            reason:
415                "coefficient repack and host-to-device upload are visible before Metal work starts",
416        };
417    }
418
419    if timings.htj2k_encode_packetization_dispatches == 0 && timings.htj2k_encode_us > 0 {
420        return TranscodeResidentStageRecommendation {
421            stage: TranscodePipelineStageKind::Packetization,
422            evidence_us: timings.htj2k_encode_us,
423            evidence_dispatches: timings.htj2k_encode_accelerator_dispatches,
424            reason:
425                "packetization and codestream assembly remain inside the CPU/native encode boundary",
426        };
427    }
428
429    TranscodeResidentStageRecommendation {
430        stage: TranscodePipelineStageKind::CodestreamAssembly,
431        evidence_us: timings.htj2k_encode_us,
432        evidence_dispatches: timings.htj2k_encode_accelerator_dispatches,
433        reason: "no stronger resident-stage candidate is visible from the current counters",
434    }
435}
436
437fn processor_for(
438    cpu_us: u128,
439    metal_us: u128,
440    transfer_us: u128,
441    dispatches: usize,
442    fallback_jobs: usize,
443) -> TranscodeStageProcessor {
444    let has_cpu = cpu_us > 0 || fallback_jobs > 0;
445    let has_metal = metal_us > 0 || transfer_us > 0 || dispatches > 0;
446    match (has_cpu, has_metal) {
447        (true, true) => TranscodeStageProcessor::Hybrid,
448        (false, true) => TranscodeStageProcessor::Metal,
449        (_, false) => TranscodeStageProcessor::Cpu,
450    }
451}
452
453fn transfer_count_or_timing(
454    explicit_count: usize,
455    transfer_us: u128,
456    transfer_bytes: u64,
457) -> usize {
458    if explicit_count > 0 {
459        explicit_count
460    } else {
461        usize::from(transfer_us > 0 || transfer_bytes > 0)
462    }
463}