Skip to main content

wsi_rs/core/
decode_runtime.rs

1use crate::core::registry::SlideReader;
2use crate::core::types::{
3    CpuTile, Dataset, Level, OutputBackendRequest, TileCodecKind, TileLayout, TileOutputPreference,
4    TilePixels, TileRequest,
5};
6use crate::error::WsiError;
7use rayon::ThreadPool;
8use std::cell::RefCell;
9use std::collections::{HashMap, VecDeque};
10use std::num::NonZeroUsize;
11use std::sync::{Arc, Mutex, OnceLock};
12use std::time::{Duration, Instant};
13
14const DEFAULT_ROUTE_SAMPLE_SIZE: usize = 32;
15const DIRECT_DEVICE_BATCH_THRESHOLD: usize = 8;
16const DEVICE_WIN_RATIO: f64 = 0.85;
17const ROUTE_CACHE_MAX_ENTRIES: usize = 1024;
18
19thread_local! {
20    static CURRENT_DECODE_RUNTIME: RefCell<Option<Arc<DecodeRuntime>>> = const { RefCell::new(None) };
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[non_exhaustive]
25pub struct DecodeExecutionOptions {
26    jp2k_cpu_threads: Option<NonZeroUsize>,
27    route_sample_size: usize,
28}
29
30impl DecodeExecutionOptions {
31    pub fn with_jp2k_cpu_threads(mut self, threads: NonZeroUsize) -> Self {
32        self.jp2k_cpu_threads = Some(threads);
33        self
34    }
35
36    pub fn with_route_sample_size(mut self, sample_size: usize) -> Self {
37        self.route_sample_size = sample_size.max(1);
38        self
39    }
40
41    pub fn jp2k_cpu_threads(&self) -> Option<NonZeroUsize> {
42        self.jp2k_cpu_threads
43    }
44
45    pub fn route_sample_size(&self) -> usize {
46        self.route_sample_size
47    }
48}
49
50impl Default for DecodeExecutionOptions {
51    fn default() -> Self {
52        Self {
53            jp2k_cpu_threads: None,
54            route_sample_size: DEFAULT_ROUTE_SAMPLE_SIZE,
55        }
56    }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60#[non_exhaustive]
61pub enum DecodeRoute {
62    Cpu,
63    Device,
64}
65
66#[derive(Debug, Clone, PartialEq)]
67#[non_exhaustive]
68pub struct DecodeRouteDecision {
69    pub winner: DecodeRoute,
70    pub sample_tile_count: usize,
71    pub cpu_elapsed: Duration,
72    pub device_elapsed: Duration,
73    pub device_tile_count: usize,
74}
75
76impl DecodeRouteDecision {
77    pub fn measured(
78        sample_tile_count: usize,
79        cpu_elapsed: Duration,
80        device_elapsed: Duration,
81        device_tile_count: usize,
82    ) -> Self {
83        Self {
84            winner: Self::winner_for_measurement(cpu_elapsed, device_elapsed, device_tile_count),
85            sample_tile_count,
86            cpu_elapsed,
87            device_elapsed,
88            device_tile_count,
89        }
90    }
91
92    pub fn winner_for_measurement(
93        cpu_elapsed: Duration,
94        device_elapsed: Duration,
95        device_tile_count: usize,
96    ) -> DecodeRoute {
97        let cpu_ms = cpu_elapsed.as_secs_f64() * 1000.0;
98        let device_ms = device_elapsed.as_secs_f64() * 1000.0;
99        if device_tile_count > 0 && cpu_ms > 0.0 && device_ms <= cpu_ms * DEVICE_WIN_RATIO {
100            DecodeRoute::Device
101        } else {
102            DecodeRoute::Cpu
103        }
104    }
105}
106
107struct MeasuredDecodeRoute {
108    decision: DecodeRouteDecision,
109    sample_tiles: Vec<TilePixels>,
110}
111
112#[derive(Debug)]
113pub(crate) struct DecodeRuntime {
114    options: DecodeExecutionOptions,
115    jp2k_cpu_pool: Option<ThreadPool>,
116    route_cache: Mutex<DecodeRouteCache>,
117}
118
119impl DecodeRuntime {
120    pub(crate) fn new(options: DecodeExecutionOptions) -> Result<Self, WsiError> {
121        Self::build(options, true)
122    }
123
124    pub(crate) fn arc_for_options(options: DecodeExecutionOptions) -> Result<Arc<Self>, WsiError> {
125        if options == DecodeExecutionOptions::default() {
126            Ok(Self::default_arc())
127        } else {
128            Ok(Arc::new(Self::new(options)?))
129        }
130    }
131
132    fn build(options: DecodeExecutionOptions, fail_on_pool_error: bool) -> Result<Self, WsiError> {
133        let threads = options
134            .jp2k_cpu_threads
135            .map_or_else(default_jp2k_cpu_threads, NonZeroUsize::get);
136        let jp2k_cpu_pool = match rayon::ThreadPoolBuilder::new()
137            .num_threads(threads)
138            .thread_name(|index| format!("wsi_rs-jp2k-cpu-{index}"))
139            .build()
140        {
141            Ok(pool) => Some(pool),
142            Err(err) if fail_on_pool_error => {
143                return Err(WsiError::Unsupported {
144                    reason: format!("failed to initialize JP2K CPU decode pool: {err}"),
145                });
146            }
147            Err(err) => {
148                tracing::error!(
149                    error = %err,
150                    "failed to initialize default JP2K CPU decode pool; falling back to inline decode"
151                );
152                None
153            }
154        };
155        Ok(Self {
156            options,
157            jp2k_cpu_pool,
158            route_cache: Mutex::new(DecodeRouteCache::new()),
159        })
160    }
161
162    pub(crate) fn default_arc() -> Arc<Self> {
163        static DEFAULT_RUNTIME: OnceLock<Arc<DecodeRuntime>> = OnceLock::new();
164        DEFAULT_RUNTIME
165            .get_or_init(|| {
166                Arc::new(match Self::build(DecodeExecutionOptions::default(), false) {
167                    Ok(runtime) => runtime,
168                    Err(err) => {
169                        tracing::error!(
170                            error = %err,
171                            "failed to initialize default decode runtime; falling back to inline decode"
172                        );
173                        Self::inline(DecodeExecutionOptions::default())
174                    }
175                })
176            })
177            .clone()
178    }
179
180    fn inline(options: DecodeExecutionOptions) -> Self {
181        Self {
182            options,
183            jp2k_cpu_pool: None,
184            route_cache: Mutex::new(DecodeRouteCache::new()),
185        }
186    }
187
188    pub(crate) fn install_jp2k_cpu<R: Send>(&self, op: impl FnOnce() -> R + Send) -> R {
189        if let Some(pool) = &self.jp2k_cpu_pool {
190            pool.install(op)
191        } else {
192            op()
193        }
194    }
195
196    pub(crate) fn has_jp2k_cpu_pool(&self) -> bool {
197        self.jp2k_cpu_pool.is_some()
198    }
199
200    pub(crate) fn options(&self) -> DecodeExecutionOptions {
201        self.options
202    }
203
204    pub(crate) fn with_current<T>(self: &Arc<Self>, f: impl FnOnce() -> T) -> T {
205        struct Restore(Option<Arc<DecodeRuntime>>);
206        impl Drop for Restore {
207            fn drop(&mut self) {
208                let previous = self.0.take();
209                CURRENT_DECODE_RUNTIME.with(|slot| {
210                    *slot.borrow_mut() = previous;
211                });
212            }
213        }
214
215        let previous = CURRENT_DECODE_RUNTIME.with(|slot| slot.replace(Some(self.clone())));
216        let _restore = Restore(previous);
217        f()
218    }
219
220    fn cached_route(&self, key: &DecodeRouteKey) -> Option<DecodeRouteDecision> {
221        self.route_cache
222            .lock()
223            .unwrap_or_else(|err| err.into_inner())
224            .get(key)
225    }
226
227    fn store_route(&self, key: DecodeRouteKey, decision: DecodeRouteDecision) {
228        self.route_cache
229            .lock()
230            .unwrap_or_else(|err| err.into_inner())
231            .insert(key, decision);
232    }
233}
234
235#[derive(Debug)]
236struct DecodeRouteCache {
237    entries: HashMap<DecodeRouteKey, DecodeRouteDecision>,
238    insertion_order: VecDeque<DecodeRouteKey>,
239}
240
241impl DecodeRouteCache {
242    fn new() -> Self {
243        Self {
244            entries: HashMap::new(),
245            insertion_order: VecDeque::new(),
246        }
247    }
248
249    fn get(&self, key: &DecodeRouteKey) -> Option<DecodeRouteDecision> {
250        self.entries.get(key).cloned()
251    }
252
253    fn insert(&mut self, key: DecodeRouteKey, decision: DecodeRouteDecision) {
254        if !self.entries.contains_key(&key) {
255            while self.entries.len() >= ROUTE_CACHE_MAX_ENTRIES {
256                let Some(evicted) = self.insertion_order.pop_front() else {
257                    break;
258                };
259                self.entries.remove(&evicted);
260            }
261            self.insertion_order.push_back(key.clone());
262        }
263        self.entries.insert(key, decision);
264    }
265
266    #[cfg(test)]
267    fn len(&self) -> usize {
268        self.entries.len()
269    }
270}
271
272pub(crate) fn current_decode_runtime() -> Option<Arc<DecodeRuntime>> {
273    CURRENT_DECODE_RUNTIME.with(|slot| slot.borrow().clone())
274}
275
276fn default_jp2k_cpu_threads() -> usize {
277    std::thread::available_parallelism()
278        .map_or(1, NonZeroUsize::get)
279        .max(1)
280}
281
282#[derive(Debug, Clone, PartialEq, Eq, Hash)]
283struct DecodeRouteKey {
284    dataset_id: u128,
285    scene: usize,
286    series: usize,
287    level: u32,
288    tile_grid: RouteTileGrid,
289    codec_kind: TileCodecKind,
290    output_backend: OutputBackendRequest,
291    device_backend_identity: String,
292    sample_tile_count: usize,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
296struct RouteTileGrid {
297    tile_width: u32,
298    tile_height: u32,
299    tiles_across: u64,
300    tiles_down: u64,
301}
302
303pub(crate) struct AdaptiveDecodeReader {
304    inner: Box<dyn SlideReader>,
305    runtime: Arc<DecodeRuntime>,
306}
307
308impl AdaptiveDecodeReader {
309    pub(crate) fn new(inner: Box<dyn SlideReader>, runtime: Arc<DecodeRuntime>) -> Self {
310        Self { inner, runtime }
311    }
312
313    fn read_tiles_adaptive(
314        &self,
315        reqs: &[TileRequest],
316        output: TileOutputPreference,
317    ) -> Result<Vec<TilePixels>, WsiError> {
318        if !should_adapt_output(&output) {
319            tracing::debug!(
320                requested_tiles = reqs.len(),
321                adaptive_decode = false,
322                "wsi tile batch routed without adaptive decode"
323            );
324            return self
325                .runtime
326                .with_current(|| self.inner.read_tiles(reqs, output));
327        }
328        let route_sample_size = self.runtime.options.route_sample_size();
329        let Some(key) = route_key_for_batch(self.inner.as_ref(), reqs, &output, route_sample_size)
330        else {
331            tracing::debug!(
332                requested_tiles = reqs.len(),
333                route_sample_size,
334                adaptive_decode = true,
335                route_key_available = false,
336                "wsi adaptive decode fell back to requested output"
337            );
338            return self
339                .runtime
340                .with_current(|| self.inner.read_tiles(reqs, output));
341        };
342        if reqs.len() >= DIRECT_DEVICE_BATCH_THRESHOLD {
343            tracing::debug!(
344                requested_tiles = reqs.len(),
345                route_sample_size,
346                direct_device_batch_threshold = DIRECT_DEVICE_BATCH_THRESHOLD,
347                adaptive_decode = true,
348                route_key_available = true,
349                "wsi adaptive decode sent large batch through requested output"
350            );
351            return self
352                .runtime
353                .with_current(|| self.inner.read_tiles(reqs, output));
354        }
355        let (route, measured) = match self.runtime.cached_route(&key) {
356            Some(decision) => {
357                tracing::debug!(
358                    requested_tiles = reqs.len(),
359                    route_sample_size,
360                    route_cache_hit = true,
361                    route = ?decision.winner,
362                    sample_tile_count = decision.sample_tile_count,
363                    cpu_elapsed_ms = decision.cpu_elapsed.as_secs_f64() * 1000.0,
364                    device_elapsed_ms = decision.device_elapsed.as_secs_f64() * 1000.0,
365                    device_tile_count = decision.device_tile_count,
366                    "wsi adaptive decode reused cached route"
367                );
368                (decision.winner, None)
369            }
370            None => {
371                let measured = self.measure_route(reqs, output.clone())?;
372                let winner = measured.decision.winner;
373                tracing::debug!(
374                    requested_tiles = reqs.len(),
375                    route_sample_size,
376                    route_cache_hit = false,
377                    route = ?winner,
378                    sample_tile_count = measured.decision.sample_tile_count,
379                    cpu_elapsed_ms = measured.decision.cpu_elapsed.as_secs_f64() * 1000.0,
380                    device_elapsed_ms = measured.decision.device_elapsed.as_secs_f64() * 1000.0,
381                    device_tile_count = measured.decision.device_tile_count,
382                    "wsi adaptive decode measured route"
383                );
384                self.runtime.store_route(key, measured.decision.clone());
385                (winner, Some(measured.sample_tiles))
386            }
387        };
388        let routed_output = match route {
389            DecodeRoute::Cpu => TileOutputPreference::cpu(),
390            DecodeRoute::Device => output,
391        };
392        if let Some(mut measured) = measured {
393            let sample_len = reqs.len().min(self.runtime.options.route_sample_size());
394            if measured.len() == sample_len {
395                if sample_len == reqs.len() {
396                    return Ok(measured);
397                }
398                let mut rest = self
399                    .runtime
400                    .with_current(|| self.inner.read_tiles(&reqs[sample_len..], routed_output))?;
401                measured.append(&mut rest);
402                return Ok(measured);
403            }
404        }
405        self.runtime
406            .with_current(|| self.inner.read_tiles(reqs, routed_output))
407    }
408
409    fn measure_route(
410        &self,
411        reqs: &[TileRequest],
412        device_output: TileOutputPreference,
413    ) -> Result<MeasuredDecodeRoute, WsiError> {
414        let sample_len = reqs.len().min(self.runtime.options.route_sample_size());
415        let sample = &reqs[..sample_len];
416
417        let device_started = Instant::now();
418        let device_result = self
419            .runtime
420            .with_current(|| self.inner.read_tiles(sample, device_output));
421        let device_elapsed = device_started.elapsed();
422        let device_tile_count = device_result
423            .as_ref()
424            .map(|tiles| {
425                tiles
426                    .iter()
427                    .filter(|tile| matches!(tile, TilePixels::Device(_)))
428                    .count()
429            })
430            .unwrap_or(0);
431        let device_result = match device_result {
432            Ok(device_tiles) if device_tile_count == 0 => {
433                return Ok(MeasuredDecodeRoute {
434                    decision: DecodeRouteDecision::measured(
435                        device_tiles.len(),
436                        device_elapsed,
437                        device_elapsed,
438                        device_tile_count,
439                    ),
440                    sample_tiles: device_tiles,
441                });
442            }
443            other => other,
444        };
445
446        let cpu_started = Instant::now();
447        let cpu_tiles = self
448            .runtime
449            .with_current(|| self.inner.read_tiles(sample, TileOutputPreference::cpu()))?;
450        let cpu_elapsed = cpu_started.elapsed();
451
452        let decision = DecodeRouteDecision::measured(
453            cpu_tiles.len(),
454            cpu_elapsed,
455            device_elapsed,
456            device_tile_count,
457        );
458        let sample_tiles = match decision.winner {
459            DecodeRoute::Cpu => cpu_tiles,
460            DecodeRoute::Device => device_result?,
461        };
462
463        Ok(MeasuredDecodeRoute {
464            decision,
465            sample_tiles,
466        })
467    }
468}
469
470impl SlideReader for AdaptiveDecodeReader {
471    fn dataset(&self) -> &Dataset {
472        self.inner.dataset()
473    }
474
475    fn tile_codec_kind(&self, req: &TileRequest) -> TileCodecKind {
476        self.inner.tile_codec_kind(req)
477    }
478
479    fn level_source_kind(
480        &self,
481        scene: crate::core::types::SceneId,
482        series: crate::core::types::SeriesId,
483        level: crate::core::types::LevelIdx,
484    ) -> Result<crate::core::types::LevelSourceKind, WsiError> {
485        self.inner.level_source_kind(scene, series, level)
486    }
487
488    fn read_tiles(
489        &self,
490        reqs: &[TileRequest],
491        output: TileOutputPreference,
492    ) -> Result<Vec<TilePixels>, WsiError> {
493        self.read_tiles_adaptive(reqs, output)
494    }
495
496    fn read_tile_cpu(&self, req: &TileRequest) -> Result<CpuTile, WsiError> {
497        self.runtime.with_current(|| self.inner.read_tile_cpu(req))
498    }
499
500    fn read_raw_compressed_tile(
501        &self,
502        req: &TileRequest,
503    ) -> Result<crate::core::types::RawCompressedTile, WsiError> {
504        self.inner.read_raw_compressed_tile(req)
505    }
506
507    fn read_raw_compressed_display_tile(
508        &self,
509        req: &crate::core::types::TileViewRequest,
510    ) -> Result<crate::core::types::RawCompressedTile, WsiError> {
511        self.inner.read_raw_compressed_display_tile(req)
512    }
513
514    fn read_tiles_cpu(&self, reqs: &[TileRequest]) -> Result<Vec<CpuTile>, WsiError> {
515        self.runtime
516            .with_current(|| self.inner.read_tiles_cpu(reqs))
517    }
518
519    fn use_display_tile_cache(&self, req: &crate::core::types::TileViewRequest) -> bool {
520        self.inner.use_display_tile_cache(req)
521    }
522
523    fn read_region_fastpath(
524        &self,
525        ctx: &mut crate::core::registry::SlideReadContext<'_>,
526        req: &crate::core::types::RegionRequest,
527    ) -> Option<Result<CpuTile, WsiError>> {
528        self.runtime
529            .with_current(|| self.inner.read_region_fastpath(ctx, req))
530    }
531
532    fn read_region(
533        &self,
534        req: &crate::core::types::RegionRequest,
535        output: TileOutputPreference,
536    ) -> Result<TilePixels, WsiError> {
537        self.runtime
538            .with_current(|| self.inner.read_region(req, output))
539    }
540
541    fn read_display_tile(
542        &self,
543        req: &crate::core::types::TileViewRequest,
544    ) -> Result<CpuTile, WsiError> {
545        self.runtime
546            .with_current(|| self.inner.read_display_tile(req))
547    }
548
549    fn associated_image(&self, name: &str) -> Result<Option<CpuTile>, WsiError> {
550        self.inner.associated_image(name)
551    }
552
553    fn read_associated(&self, name: &str) -> Result<CpuTile, WsiError> {
554        self.inner.read_associated(name)
555    }
556
557    fn recommended_shared_cache_bytes(&self) -> Option<u64> {
558        self.inner.recommended_shared_cache_bytes()
559    }
560}
561
562fn should_adapt_output(output: &TileOutputPreference) -> bool {
563    matches!(output, TileOutputPreference::PreferDevice { .. })
564        && output.compressed_device_decode_enabled()
565        && output.adaptive_decode_route_enabled()
566}
567
568fn route_key_for_batch(
569    reader: &dyn SlideReader,
570    reqs: &[TileRequest],
571    output: &TileOutputPreference,
572    route_sample_size: usize,
573) -> Option<DecodeRouteKey> {
574    let first = reqs.first()?;
575    if !reqs.iter().all(|req| {
576        req.scene == first.scene && req.series == first.series && req.level == first.level
577    }) {
578        return None;
579    }
580    let codec_kind = reader.tile_codec_kind(first);
581    if !matches!(codec_kind, TileCodecKind::Jp2k | TileCodecKind::Htj2k) {
582        return None;
583    }
584    if !reqs
585        .iter()
586        .all(|req| reader.tile_codec_kind(req) == codec_kind)
587    {
588        return None;
589    }
590    let level = dataset_level(
591        reader.dataset(),
592        first.scene.get(),
593        first.series.get(),
594        first.level.get(),
595    )?;
596    let tile_grid = route_tile_grid(level)?;
597    Some(DecodeRouteKey {
598        dataset_id: reader.dataset().id.0,
599        scene: first.scene.get(),
600        series: first.series.get(),
601        level: first.level.get(),
602        tile_grid,
603        codec_kind,
604        output_backend: output.backend(),
605        device_backend_identity: device_backend_identity(output),
606        sample_tile_count: reqs.len().min(route_sample_size.max(1)),
607    })
608}
609
610fn dataset_level(dataset: &Dataset, scene: usize, series: usize, level: u32) -> Option<&Level> {
611    dataset
612        .scenes
613        .get(scene)?
614        .series
615        .get(series)?
616        .levels
617        .get(level as usize)
618}
619
620fn route_tile_grid(level: &Level) -> Option<RouteTileGrid> {
621    match &level.tile_layout {
622        TileLayout::Regular {
623            tile_width,
624            tile_height,
625            tiles_across,
626            tiles_down,
627        } => Some(RouteTileGrid {
628            tile_width: *tile_width,
629            tile_height: *tile_height,
630            tiles_across: *tiles_across,
631            tiles_down: *tiles_down,
632        }),
633        _ => None,
634    }
635}
636
637fn device_backend_identity(output: &TileOutputPreference) -> String {
638    #[cfg(feature = "metal")]
639    if let Some(metal) = output.metal_sessions() {
640        return format!("{:?}:{}", output.backend(), metal.device_identity());
641    }
642    #[cfg(feature = "cuda")]
643    if let Some(cuda) = output.cuda_sessions() {
644        return format!("{:?}:{}", output.backend(), cuda.device_identity());
645    }
646    format!("{:?}", output.backend())
647}
648
649#[cfg(test)]
650mod tests {
651    use super::*;
652    use crate::core::types::*;
653    use crate::properties::Properties;
654    use std::collections::HashMap;
655    use std::sync::atomic::{AtomicUsize, Ordering};
656
657    struct CountingAdaptiveSource {
658        dataset: Dataset,
659        batch_reads: Arc<AtomicUsize>,
660        requested_tiles: Arc<AtomicUsize>,
661    }
662
663    impl CountingAdaptiveSource {
664        fn new(batch_reads: Arc<AtomicUsize>, requested_tiles: Arc<AtomicUsize>) -> Self {
665            Self {
666                dataset: Dataset {
667                    id: DatasetId::new(42),
668                    scenes: vec![Scene {
669                        id: "scene".into(),
670                        name: None,
671                        series: vec![Series {
672                            id: "series".into(),
673                            axes: AxesShape::default(),
674                            levels: vec![Level {
675                                dimensions: (128, 128),
676                                downsample: 1.0,
677                                tile_layout: TileLayout::Regular {
678                                    tile_width: 128,
679                                    tile_height: 128,
680                                    tiles_across: 1,
681                                    tiles_down: 1,
682                                },
683                            }],
684                            sample_type: SampleType::Uint8,
685                            channels: vec![
686                                ChannelInfo {
687                                    name: Some("R".into()),
688                                    color: None,
689                                    excitation_nm: None,
690                                    emission_nm: None,
691                                },
692                                ChannelInfo {
693                                    name: Some("G".into()),
694                                    color: None,
695                                    excitation_nm: None,
696                                    emission_nm: None,
697                                },
698                                ChannelInfo {
699                                    name: Some("B".into()),
700                                    color: None,
701                                    excitation_nm: None,
702                                    emission_nm: None,
703                                },
704                            ],
705                        }],
706                    }],
707                    associated_images: HashMap::new(),
708                    properties: Properties::new(),
709                    icc_profiles: HashMap::new(),
710                    source_icc_profiles: Vec::new(),
711                },
712                batch_reads,
713                requested_tiles,
714            }
715        }
716    }
717
718    impl SlideReader for CountingAdaptiveSource {
719        fn dataset(&self) -> &Dataset {
720            &self.dataset
721        }
722
723        fn tile_codec_kind(&self, _req: &TileRequest) -> TileCodecKind {
724            TileCodecKind::Jp2k
725        }
726
727        fn read_tiles(
728            &self,
729            reqs: &[TileRequest],
730            _output: TileOutputPreference,
731        ) -> Result<Vec<TilePixels>, WsiError> {
732            self.batch_reads.fetch_add(1, Ordering::SeqCst);
733            self.requested_tiles.fetch_add(reqs.len(), Ordering::SeqCst);
734            reqs.iter()
735                .map(|req| self.read_tile_cpu(req).map(TilePixels::Cpu))
736                .collect()
737        }
738
739        fn read_tile_cpu(&self, _req: &TileRequest) -> Result<CpuTile, WsiError> {
740            Ok(CpuTile {
741                width: 128,
742                height: 128,
743                channels: 3,
744                color_space: ColorSpace::Rgb,
745                layout: CpuTileLayout::Interleaved,
746                data: CpuTileData::u8(vec![7; 128 * 128 * 3]),
747            })
748        }
749
750        fn read_associated(&self, name: &str) -> Result<CpuTile, WsiError> {
751            Err(WsiError::AssociatedImageNotFound(name.into()))
752        }
753    }
754
755    #[test]
756    fn default_decode_options_reuse_shared_runtime() {
757        let first =
758            DecodeRuntime::arc_for_options(DecodeExecutionOptions::default()).expect("runtime");
759        let second =
760            DecodeRuntime::arc_for_options(DecodeExecutionOptions::default()).expect("runtime");
761
762        assert!(Arc::ptr_eq(&first, &second));
763    }
764
765    #[test]
766    fn route_cache_is_bounded() {
767        let runtime = DecodeRuntime::new(DecodeExecutionOptions::default()).expect("runtime");
768        let first_key = route_key_for_test(0);
769
770        for sequence in 0..ROUTE_CACHE_MAX_ENTRIES + 5 {
771            runtime.store_route(
772                route_key_for_test(sequence),
773                DecodeRouteDecision::measured(
774                    1,
775                    Duration::from_millis(2),
776                    Duration::from_millis(1),
777                    1,
778                ),
779            );
780        }
781
782        let cache_len = runtime
783            .route_cache
784            .lock()
785            .unwrap_or_else(|err| err.into_inner())
786            .len();
787        assert_eq!(cache_len, ROUTE_CACHE_MAX_ENTRIES);
788        assert!(runtime.cached_route(&first_key).is_none());
789        assert!(runtime
790            .cached_route(&route_key_for_test(ROUTE_CACHE_MAX_ENTRIES + 4))
791            .is_some());
792    }
793
794    fn route_key_for_test(sequence: usize) -> DecodeRouteKey {
795        DecodeRouteKey {
796            dataset_id: sequence as u128,
797            scene: 0,
798            series: 0,
799            level: 0,
800            tile_grid: RouteTileGrid {
801                tile_width: 128,
802                tile_height: 128,
803                tiles_across: 1,
804                tiles_down: 1,
805            },
806            codec_kind: TileCodecKind::Jp2k,
807            output_backend: OutputBackendRequest::Auto,
808            device_backend_identity: format!("test-{sequence}"),
809            sample_tile_count: 1,
810        }
811    }
812
813    #[test]
814    fn adaptive_route_reuses_device_cpu_fallback_sample_for_first_read() {
815        let batch_reads = Arc::new(AtomicUsize::new(0));
816        let requested_tiles = Arc::new(AtomicUsize::new(0));
817        let runtime = Arc::new(
818            DecodeRuntime::new(DecodeExecutionOptions::default().with_route_sample_size(4))
819                .expect("decode runtime"),
820        );
821        let reader = AdaptiveDecodeReader::new(
822            Box::new(CountingAdaptiveSource::new(
823                batch_reads.clone(),
824                requested_tiles.clone(),
825            )),
826            runtime.clone(),
827        );
828        let req = TileRequest {
829            scene: 0usize.into(),
830            series: 0usize.into(),
831            level: 0u32.into(),
832            plane: PlaneSelection::default().into(),
833            col: 0,
834            row: 0,
835        };
836
837        let tiles = reader
838            .read_tiles(
839                &[req],
840                TileOutputPreference::prefer_device_auto_with_compressed_decode(),
841            )
842            .expect("adaptive read");
843
844        assert_eq!(tiles.len(), 1);
845        assert_eq!(batch_reads.load(Ordering::SeqCst), 1);
846        assert_eq!(requested_tiles.load(Ordering::SeqCst), 1);
847    }
848
849    #[test]
850    fn adaptive_route_keys_subsampled_batches_separately() {
851        let batch_reads = Arc::new(AtomicUsize::new(0));
852        let requested_tiles = Arc::new(AtomicUsize::new(0));
853        let runtime = Arc::new(
854            DecodeRuntime::new(DecodeExecutionOptions::default().with_route_sample_size(4))
855                .expect("decode runtime"),
856        );
857        let reader = AdaptiveDecodeReader::new(
858            Box::new(CountingAdaptiveSource::new(
859                batch_reads.clone(),
860                requested_tiles.clone(),
861            )),
862            runtime.clone(),
863        );
864        let req = TileRequest {
865            scene: 0usize.into(),
866            series: 0usize.into(),
867            level: 0u32.into(),
868            plane: PlaneSelection::default().into(),
869            col: 0,
870            row: 0,
871        };
872        let output = TileOutputPreference::prefer_device_auto_with_compressed_decode();
873        let single_key = route_key_for_batch(
874            reader.inner.as_ref(),
875            std::slice::from_ref(&req),
876            &output,
877            4,
878        )
879        .expect("route key is available for one-tile JP2K regular batch");
880        let full_sample_key = route_key_for_batch(
881            reader.inner.as_ref(),
882            &[req.clone(), req.clone(), req.clone(), req.clone()],
883            &output,
884            4,
885        )
886        .expect("route key is available for JP2K regular tile");
887
888        let tiles = reader.read_tiles(&[req], output).expect("adaptive read");
889
890        assert_eq!(tiles.len(), 1);
891        assert_eq!(batch_reads.load(Ordering::SeqCst), 1);
892        assert_eq!(requested_tiles.load(Ordering::SeqCst), 1);
893        assert!(
894            runtime.cached_route(&single_key).is_some(),
895            "a one-tile read should cache its own route"
896        );
897        assert!(
898            runtime.cached_route(&full_sample_key).is_none(),
899            "a one-tile read must not poison the route for four-plus-tile batches"
900        );
901    }
902
903    #[test]
904    fn default_route_sample_covers_viewer_sized_dicom_device_batches() {
905        let batch_reads = Arc::new(AtomicUsize::new(0));
906        let requested_tiles = Arc::new(AtomicUsize::new(0));
907        let reader = CountingAdaptiveSource::new(batch_reads, requested_tiles);
908        let reqs = (0..15)
909            .map(|col| TileRequest {
910                scene: 0usize.into(),
911                series: 0usize.into(),
912                level: 0u32.into(),
913                plane: PlaneSelection::default().into(),
914                col: col as i64,
915                row: 0,
916            })
917            .collect::<Vec<_>>();
918        let output = TileOutputPreference::prefer_device_auto_with_compressed_decode();
919
920        let key = route_key_for_batch(
921            &reader,
922            &reqs,
923            &output,
924            DecodeExecutionOptions::default().route_sample_size(),
925        )
926        .expect("route key is available for a viewer-sized JP2K batch");
927
928        assert_eq!(
929            key.sample_tile_count, 15,
930            "default adaptive sampling must measure a real visible-tile batch instead of undersampling into the CPU path"
931        );
932    }
933
934    #[test]
935    fn adaptive_route_sends_large_jp2k_batches_to_device_preferred_reader_without_sampling() {
936        let batch_reads = Arc::new(AtomicUsize::new(0));
937        let requested_tiles = Arc::new(AtomicUsize::new(0));
938        let runtime = Arc::new(DecodeRuntime::new(DecodeExecutionOptions::default()).unwrap());
939        let reader = AdaptiveDecodeReader::new(
940            Box::new(CountingAdaptiveSource::new(
941                batch_reads.clone(),
942                requested_tiles.clone(),
943            )),
944            runtime.clone(),
945        );
946        let reqs = (0..15)
947            .map(|col| TileRequest {
948                scene: 0usize.into(),
949                series: 0usize.into(),
950                level: 0u32.into(),
951                plane: PlaneSelection::default().into(),
952                col: col as i64,
953                row: 0,
954            })
955            .collect::<Vec<_>>();
956        let output = TileOutputPreference::prefer_device_auto_with_compressed_decode();
957        let key = route_key_for_batch(
958            reader.inner.as_ref(),
959            &reqs,
960            &output,
961            DecodeExecutionOptions::default().route_sample_size(),
962        )
963        .expect("route key is available for a viewer-sized JP2K batch");
964
965        let tiles = reader.read_tiles(&reqs, output).expect("adaptive read");
966
967        assert_eq!(tiles.len(), 15);
968        assert_eq!(
969            batch_reads.load(Ordering::SeqCst),
970            1,
971            "large JP2K batches should avoid cold adaptive double-decode"
972        );
973        assert_eq!(requested_tiles.load(Ordering::SeqCst), 15);
974        assert!(
975            runtime.cached_route(&key).is_none(),
976            "direct large-batch routing should not cache a CPU-biased sample"
977        );
978    }
979
980    #[test]
981    fn adaptive_route_samples_uncached_subthreshold_batches_before_routing_remainder() {
982        let batch_reads = Arc::new(AtomicUsize::new(0));
983        let requested_tiles = Arc::new(AtomicUsize::new(0));
984        let runtime = Arc::new(
985            DecodeRuntime::new(DecodeExecutionOptions::default().with_route_sample_size(4))
986                .expect("decode runtime"),
987        );
988        let reader = AdaptiveDecodeReader::new(
989            Box::new(CountingAdaptiveSource::new(
990                batch_reads.clone(),
991                requested_tiles.clone(),
992            )),
993            runtime.clone(),
994        );
995        let reqs = (0..7)
996            .map(|col| TileRequest {
997                scene: 0usize.into(),
998                series: 0usize.into(),
999                level: 0u32.into(),
1000                plane: PlaneSelection::default().into(),
1001                col: col as i64,
1002                row: 0,
1003            })
1004            .collect::<Vec<_>>();
1005        let output = TileOutputPreference::prefer_device_auto_with_compressed_decode();
1006        let key = route_key_for_batch(reader.inner.as_ref(), &reqs, &output, 4)
1007            .expect("route key is available for JP2K regular tile");
1008
1009        let tiles = reader
1010            .read_tiles(&reqs, output.clone())
1011            .expect("adaptive read");
1012
1013        assert_eq!(tiles.len(), 7);
1014        assert_eq!(
1015            batch_reads.load(Ordering::SeqCst),
1016            2,
1017            "uncached subthreshold batches should sample, cache a route, then route the remainder"
1018        );
1019        assert_eq!(requested_tiles.load(Ordering::SeqCst), 7);
1020        assert!(
1021            runtime.cached_route(&key).is_some(),
1022            "subthreshold auto routing should cache a measured route"
1023        );
1024
1025        batch_reads.store(0, Ordering::SeqCst);
1026        requested_tiles.store(0, Ordering::SeqCst);
1027        let tiles = reader
1028            .read_tiles(&reqs, output)
1029            .expect("cached adaptive read");
1030
1031        assert_eq!(tiles.len(), 7);
1032        assert_eq!(
1033            batch_reads.load(Ordering::SeqCst),
1034            1,
1035            "cached large-batch routes should not resample"
1036        );
1037        assert_eq!(requested_tiles.load(Ordering::SeqCst), 7);
1038    }
1039}