1use core::fmt::{self, Write as _};
6
7use crate::{BatchTranscodeReport, TranscodeReport, TranscodeTimingReport};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub enum TranscodePipelineStageKind {
12 EntropyDecode,
14 CoefficientPrep,
16 Transform,
18 QuantizationCodeBlockPrep,
20 Packetization,
22 CodestreamAssembly,
24}
25
26impl TranscodePipelineStageKind {
27 #[must_use]
29 pub const fn as_str(self) -> &'static str {
30 match self {
31 Self::EntropyDecode => "entropy_decode",
32 Self::CoefficientPrep => "coefficient_prep",
33 Self::Transform => "transform",
34 Self::QuantizationCodeBlockPrep => "quantization_code_block_prep",
35 Self::Packetization => "packetization",
36 Self::CodestreamAssembly => "codestream_assembly",
37 }
38 }
39}
40
41impl fmt::Display for TranscodePipelineStageKind {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 f.write_str(self.as_str())
44 }
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
49pub enum TranscodeStageProcessor {
50 Cpu,
52 Metal,
54 Hybrid,
56}
57
58impl TranscodeStageProcessor {
59 #[must_use]
61 pub const fn as_str(self) -> &'static str {
62 match self {
63 Self::Cpu => "Cpu",
64 Self::Metal => "Metal",
65 Self::Hybrid => "Hybrid",
66 }
67 }
68}
69
70impl fmt::Display for TranscodeStageProcessor {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 f.write_str(self.as_str())
73 }
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub struct TranscodePipelineStageReport {
79 pub stage: TranscodePipelineStageKind,
81 pub processor: TranscodeStageProcessor,
83 pub cpu_us: u128,
85 pub metal_us: u128,
87 pub transfer_us: u128,
89 pub transfer_count: usize,
91 pub transfer_bytes: u64,
93 pub resident_handoff_count: usize,
95 pub dispatches: usize,
97 pub fallback_jobs: usize,
99 pub note: &'static str,
101}
102
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
105pub struct TranscodeResidentStageRecommendation {
106 pub stage: TranscodePipelineStageKind,
108 pub evidence_us: u128,
110 pub evidence_dispatches: usize,
112 pub reason: &'static str,
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct TranscodePipelineMap {
119 pub stages: Vec<TranscodePipelineStageReport>,
121 pub recommendation: TranscodeResidentStageRecommendation,
123}
124
125impl TranscodePipelineMap {
126 #[must_use]
128 pub fn from_timings(timings: &TranscodeTimingReport) -> Self {
129 Self {
130 stages: vec![
131 entropy_decode_stage(timings),
132 coefficient_prep_stage(timings),
133 transform_stage(timings),
134 quantization_code_block_stage(timings),
135 packetization_stage(timings),
136 codestream_assembly_stage(timings),
137 ],
138 recommendation: recommend_next_resident_stage(timings),
139 }
140 }
141
142 #[must_use]
144 pub fn debug_report(&self) -> String {
145 let mut output = String::from("jpeg_to_htj2k_pipeline_map\n");
146 for stage in &self.stages {
147 writeln!(
148 output,
149 "stage={} processor={} cpu_us={} metal_us={} transfer_us={} transfer_count={} transfer_bytes={} resident_handoffs={} dispatches={} fallback_jobs={} note={}",
150 stage.stage,
151 stage.processor,
152 stage.cpu_us,
153 stage.metal_us,
154 stage.transfer_us,
155 stage.transfer_count,
156 stage.transfer_bytes,
157 stage.resident_handoff_count,
158 stage.dispatches,
159 stage.fallback_jobs,
160 stage.note,
161 )
162 .expect("writing a transcode pipeline debug report to a String cannot fail");
163 }
164 writeln!(
165 output,
166 "recommend_next_stage={} evidence_us={} evidence_dispatches={} reason={}",
167 self.recommendation.stage,
168 self.recommendation.evidence_us,
169 self.recommendation.evidence_dispatches,
170 self.recommendation.reason,
171 )
172 .expect("writing a transcode pipeline recommendation to a String cannot fail");
173 output
174 }
175}
176
177impl TranscodeTimingReport {
178 #[must_use]
180 pub fn pipeline_map(&self) -> TranscodePipelineMap {
181 TranscodePipelineMap::from_timings(self)
182 }
183}
184
185impl TranscodeReport {
186 #[must_use]
188 pub fn pipeline_map(&self) -> TranscodePipelineMap {
189 self.timings.pipeline_map()
190 }
191}
192
193impl BatchTranscodeReport {
194 #[must_use]
196 pub fn pipeline_map(&self) -> TranscodePipelineMap {
197 self.timings.pipeline_map()
198 }
199}
200
201fn entropy_decode_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
202 TranscodePipelineStageReport {
203 stage: TranscodePipelineStageKind::EntropyDecode,
204 processor: TranscodeStageProcessor::Cpu,
205 cpu_us: timings.jpeg_dct_extract_us,
206 metal_us: 0,
207 transfer_us: 0,
208 transfer_count: 0,
209 transfer_bytes: 0,
210 resident_handoff_count: 0,
211 dispatches: 0,
212 fallback_jobs: 0,
213 note: "JPEG marker parsing, entropy decode, dequantization, and DCT coefficient extraction stay on CPU",
214 }
215}
216
217fn coefficient_prep_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
218 let transfer_us = timings.dwt97_batch_pack_upload_us;
219 let transfer_count = transfer_count_or_timing(
220 timings.dwt97_batch_pack_upload_transfers,
221 transfer_us,
222 timings.dwt97_batch_pack_upload_bytes,
223 );
224 let transfer_dispatch = usize::from(
225 transfer_us > 0 || transfer_count > 0 || timings.dwt97_batch_pack_upload_bytes > 0,
226 );
227 TranscodePipelineStageReport {
228 stage: TranscodePipelineStageKind::CoefficientPrep,
229 processor: processor_for(
230 timings.jpeg_dct_repack_us,
231 0,
232 transfer_us,
233 transfer_dispatch,
234 0,
235 ),
236 cpu_us: timings.jpeg_dct_repack_us,
237 metal_us: 0,
238 transfer_us,
239 transfer_count,
240 transfer_bytes: timings.dwt97_batch_pack_upload_bytes,
241 resident_handoff_count: timings.dwt97_batch_resident_dct_handoff_count,
242 dispatches: transfer_dispatch,
243 fallback_jobs: 0,
244 note: "DCT coefficient repack and Metal buffer pack/upload are visible before transform dispatch",
245 }
246}
247
248fn transform_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
249 let dwt97_kernel_us = timings
250 .dwt97_batch_idct_row_lift_us
251 .saturating_add(timings.dwt97_batch_column_lift_us);
252 let metal_us = if dwt97_kernel_us > 0 {
253 dwt97_kernel_us
254 } else if timings.accelerator_dispatches > 0 {
255 timings.dct_to_wavelet_accelerator_us
256 } else {
257 0
258 };
259 let cpu_us = timings
260 .dct_to_wavelet_cpu_fallback_us
261 .saturating_add(timings.dwt_decompose_us);
262 TranscodePipelineStageReport {
263 stage: TranscodePipelineStageKind::Transform,
264 processor: processor_for(
265 cpu_us,
266 metal_us,
267 timings.dwt97_batch_readback_us,
268 timings.accelerator_dispatches,
269 timings.cpu_fallback_jobs,
270 ),
271 cpu_us,
272 metal_us,
273 transfer_us: timings.dwt97_batch_readback_us,
274 transfer_count: transfer_count_or_timing(
275 timings.dwt97_batch_readback_transfers,
276 timings.dwt97_batch_readback_us,
277 timings.dwt97_batch_readback_bytes,
278 ),
279 transfer_bytes: timings.dwt97_batch_readback_bytes,
280 resident_handoff_count: timings.dwt97_batch_resident_dwt_handoff_count,
281 dispatches: timings.accelerator_dispatches,
282 fallback_jobs: timings.cpu_fallback_jobs,
283 note: "DCT-grid to DWT projection uses accelerator dispatches when available; Ok(None) jobs remain caller CPU fallback",
284 }
285}
286
287fn quantization_code_block_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
288 let metal_us = timings
289 .dwt97_batch_quantize_codeblock_us
290 .saturating_add(timings.dwt97_batch_ht_encode_us)
291 .saturating_add(timings.dwt97_batch_ht_kernel_us)
292 .saturating_add(timings.dwt97_batch_ht_compact_us);
293 let transfer_us = timings
294 .dwt97_batch_ht_status_readback_us
295 .saturating_add(timings.dwt97_batch_ht_output_readback_us);
296 let transfer_count = timings
297 .dwt97_batch_ht_status_readback_transfers
298 .saturating_add(timings.dwt97_batch_ht_output_readback_transfers);
299 let transfer_bytes = timings
300 .dwt97_batch_ht_status_readback_bytes
301 .saturating_add(timings.dwt97_batch_ht_output_readback_bytes);
302 let transfer_count = transfer_count_or_timing(transfer_count, transfer_us, transfer_bytes);
303 let dispatches = timings
304 .dwt97_batch_ht_codeblock_dispatches
305 .saturating_add(timings.htj2k_encode_ht_code_block_dispatches);
306 TranscodePipelineStageReport {
307 stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
308 processor: processor_for(0, metal_us, transfer_us, dispatches, 0),
309 cpu_us: 0,
310 metal_us,
311 transfer_us,
312 transfer_count,
313 transfer_bytes,
314 resident_handoff_count: 0,
315 dispatches,
316 fallback_jobs: 0,
317 note: "9/7 quantization/code-block layout is only isolated when a backend reports resident stage timings; otherwise it is inside native encode time",
318 }
319}
320
321fn packetization_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
322 let dispatches = timings.htj2k_encode_packetization_dispatches;
323 TranscodePipelineStageReport {
324 stage: TranscodePipelineStageKind::Packetization,
325 processor: processor_for(0, 0, 0, dispatches, 0),
326 cpu_us: 0,
327 metal_us: 0,
328 transfer_us: 0,
329 transfer_count: 0,
330 transfer_bytes: 0,
331 resident_handoff_count: 0,
332 dispatches,
333 fallback_jobs: 0,
334 note: "Packetization dispatches are counted separately when an encode-stage accelerator handles them; CPU time is otherwise inside native encode time",
335 }
336}
337
338fn codestream_assembly_stage(timings: &TranscodeTimingReport) -> TranscodePipelineStageReport {
339 TranscodePipelineStageReport {
340 stage: TranscodePipelineStageKind::CodestreamAssembly,
341 processor: TranscodeStageProcessor::Cpu,
342 cpu_us: timings.htj2k_encode_us,
343 metal_us: 0,
344 transfer_us: 0,
345 transfer_count: 0,
346 transfer_bytes: 0,
347 resident_handoff_count: 0,
348 dispatches: 0,
349 fallback_jobs: 0,
350 note: "Final marker, tile-part, packet byte ordering, and codestream assembly remain at the CPU/native encoder boundary",
351 }
352}
353
354fn recommend_next_resident_stage(
355 timings: &TranscodeTimingReport,
356) -> TranscodeResidentStageRecommendation {
357 let transform_readback_us = timings.dwt97_batch_readback_us;
358 let code_block_readback_us = timings
359 .dwt97_batch_ht_status_readback_us
360 .saturating_add(timings.dwt97_batch_ht_output_readback_us);
361 let has_resident_transform = timings.accelerator_dispatches > 0
362 && (timings.dwt97_batch_idct_row_lift_us > 0
363 || timings.dwt97_batch_column_lift_us > 0
364 || timings.dct_to_wavelet_accelerator_us > 0);
365
366 if timings.cpu_fallback_jobs > 0 && timings.accelerator_dispatches == 0 {
367 return TranscodeResidentStageRecommendation {
368 stage: TranscodePipelineStageKind::Transform,
369 evidence_us: timings.dct_to_wavelet_cpu_fallback_us,
370 evidence_dispatches: timings.accelerator_attempts,
371 reason: "transform jobs are still completing through caller CPU fallback",
372 };
373 }
374
375 if has_resident_transform && timings.dwt97_batch_quantize_codeblock_us == 0 {
376 return TranscodeResidentStageRecommendation {
377 stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
378 evidence_us: transform_readback_us.saturating_add(timings.htj2k_encode_us),
379 evidence_dispatches: timings.accelerator_dispatches,
380 reason: "resident transform output is read back before quantization/code-block prep and native encode",
381 };
382 }
383
384 if timings.dwt97_batch_quantize_codeblock_us > 0
385 && timings.dwt97_batch_ht_codeblock_dispatches == 0
386 {
387 return TranscodeResidentStageRecommendation {
388 stage: TranscodePipelineStageKind::QuantizationCodeBlockPrep,
389 evidence_us: transform_readback_us
390 .saturating_add(code_block_readback_us)
391 .saturating_add(timings.htj2k_encode_us),
392 evidence_dispatches: timings.accelerator_dispatches,
393 reason: "Metal already reaches 9/7 code-block prep; extend residency through HT code-block encode before packetization",
394 };
395 }
396
397 if timings.cpu_fallback_jobs > 0 {
398 return TranscodeResidentStageRecommendation {
399 stage: TranscodePipelineStageKind::Transform,
400 evidence_us: timings.dct_to_wavelet_cpu_fallback_us,
401 evidence_dispatches: timings.accelerator_attempts,
402 reason: "some transform jobs still use CPU fallback after accelerator attempts",
403 };
404 }
405
406 let coefficient_prep_us = timings
407 .jpeg_dct_repack_us
408 .saturating_add(timings.dwt97_batch_pack_upload_us);
409 if coefficient_prep_us > 0 {
410 return TranscodeResidentStageRecommendation {
411 stage: TranscodePipelineStageKind::CoefficientPrep,
412 evidence_us: coefficient_prep_us,
413 evidence_dispatches: timings.accelerator_dispatches,
414 reason:
415 "coefficient repack and host-to-device upload are visible before Metal work starts",
416 };
417 }
418
419 if timings.htj2k_encode_packetization_dispatches == 0 && timings.htj2k_encode_us > 0 {
420 return TranscodeResidentStageRecommendation {
421 stage: TranscodePipelineStageKind::Packetization,
422 evidence_us: timings.htj2k_encode_us,
423 evidence_dispatches: timings.htj2k_encode_accelerator_dispatches,
424 reason:
425 "packetization and codestream assembly remain inside the CPU/native encode boundary",
426 };
427 }
428
429 TranscodeResidentStageRecommendation {
430 stage: TranscodePipelineStageKind::CodestreamAssembly,
431 evidence_us: timings.htj2k_encode_us,
432 evidence_dispatches: timings.htj2k_encode_accelerator_dispatches,
433 reason: "no stronger resident-stage candidate is visible from the current counters",
434 }
435}
436
437fn processor_for(
438 cpu_us: u128,
439 metal_us: u128,
440 transfer_us: u128,
441 dispatches: usize,
442 fallback_jobs: usize,
443) -> TranscodeStageProcessor {
444 let has_cpu = cpu_us > 0 || fallback_jobs > 0;
445 let has_metal = metal_us > 0 || transfer_us > 0 || dispatches > 0;
446 match (has_cpu, has_metal) {
447 (true, true) => TranscodeStageProcessor::Hybrid,
448 (false, true) => TranscodeStageProcessor::Metal,
449 (_, false) => TranscodeStageProcessor::Cpu,
450 }
451}
452
453fn transfer_count_or_timing(
454 explicit_count: usize,
455 transfer_us: u128,
456 transfer_bytes: u64,
457) -> usize {
458 if explicit_count > 0 {
459 explicit_count
460 } else {
461 usize::from(transfer_us > 0 || transfer_bytes > 0)
462 }
463}