1use crate::core::registry::SlideReader;
2use crate::core::types::{
3 CpuTile, Dataset, Level, OutputBackendRequest, TileCodecKind, TileLayout, TileOutputPreference,
4 TilePixels, TileRequest,
5};
6use crate::error::WsiError;
7use rayon::ThreadPool;
8use std::cell::RefCell;
9use std::collections::{HashMap, VecDeque};
10use std::num::NonZeroUsize;
11use std::sync::{Arc, Mutex, OnceLock};
12use std::time::{Duration, Instant};
13
14const DEFAULT_ROUTE_SAMPLE_SIZE: usize = 32;
15const DIRECT_DEVICE_BATCH_THRESHOLD: usize = 8;
16const DEVICE_WIN_RATIO: f64 = 0.85;
17const ROUTE_CACHE_MAX_ENTRIES: usize = 1024;
18
19thread_local! {
20 static CURRENT_DECODE_RUNTIME: RefCell<Option<Arc<DecodeRuntime>>> = const { RefCell::new(None) };
21}
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24#[non_exhaustive]
25pub struct DecodeExecutionOptions {
26 jp2k_cpu_threads: Option<NonZeroUsize>,
27 route_sample_size: usize,
28}
29
30impl DecodeExecutionOptions {
31 pub fn with_jp2k_cpu_threads(mut self, threads: NonZeroUsize) -> Self {
32 self.jp2k_cpu_threads = Some(threads);
33 self
34 }
35
36 pub fn with_route_sample_size(mut self, sample_size: usize) -> Self {
37 self.route_sample_size = sample_size.max(1);
38 self
39 }
40
41 pub fn jp2k_cpu_threads(&self) -> Option<NonZeroUsize> {
42 self.jp2k_cpu_threads
43 }
44
45 pub fn route_sample_size(&self) -> usize {
46 self.route_sample_size
47 }
48}
49
50impl Default for DecodeExecutionOptions {
51 fn default() -> Self {
52 Self {
53 jp2k_cpu_threads: None,
54 route_sample_size: DEFAULT_ROUTE_SAMPLE_SIZE,
55 }
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
60#[non_exhaustive]
61pub enum DecodeRoute {
62 Cpu,
63 Device,
64}
65
66#[derive(Debug, Clone, PartialEq)]
67#[non_exhaustive]
68pub struct DecodeRouteDecision {
69 pub winner: DecodeRoute,
70 pub sample_tile_count: usize,
71 pub cpu_elapsed: Duration,
72 pub device_elapsed: Duration,
73 pub device_tile_count: usize,
74}
75
76impl DecodeRouteDecision {
77 pub fn measured(
78 sample_tile_count: usize,
79 cpu_elapsed: Duration,
80 device_elapsed: Duration,
81 device_tile_count: usize,
82 ) -> Self {
83 Self {
84 winner: Self::winner_for_measurement(cpu_elapsed, device_elapsed, device_tile_count),
85 sample_tile_count,
86 cpu_elapsed,
87 device_elapsed,
88 device_tile_count,
89 }
90 }
91
92 pub fn winner_for_measurement(
93 cpu_elapsed: Duration,
94 device_elapsed: Duration,
95 device_tile_count: usize,
96 ) -> DecodeRoute {
97 let cpu_ms = cpu_elapsed.as_secs_f64() * 1000.0;
98 let device_ms = device_elapsed.as_secs_f64() * 1000.0;
99 if device_tile_count > 0 && cpu_ms > 0.0 && device_ms <= cpu_ms * DEVICE_WIN_RATIO {
100 DecodeRoute::Device
101 } else {
102 DecodeRoute::Cpu
103 }
104 }
105}
106
107struct MeasuredDecodeRoute {
108 decision: DecodeRouteDecision,
109 sample_tiles: Vec<TilePixels>,
110}
111
112#[derive(Debug)]
113pub(crate) struct DecodeRuntime {
114 options: DecodeExecutionOptions,
115 jp2k_cpu_pool: Option<ThreadPool>,
116 route_cache: Mutex<DecodeRouteCache>,
117}
118
119impl DecodeRuntime {
120 pub(crate) fn new(options: DecodeExecutionOptions) -> Result<Self, WsiError> {
121 Self::build(options, true)
122 }
123
124 pub(crate) fn arc_for_options(options: DecodeExecutionOptions) -> Result<Arc<Self>, WsiError> {
125 if options == DecodeExecutionOptions::default() {
126 Ok(Self::default_arc())
127 } else {
128 Ok(Arc::new(Self::new(options)?))
129 }
130 }
131
132 fn build(options: DecodeExecutionOptions, fail_on_pool_error: bool) -> Result<Self, WsiError> {
133 let threads = options
134 .jp2k_cpu_threads
135 .map_or_else(default_jp2k_cpu_threads, NonZeroUsize::get);
136 let jp2k_cpu_pool = match rayon::ThreadPoolBuilder::new()
137 .num_threads(threads)
138 .thread_name(|index| format!("wsi_rs-jp2k-cpu-{index}"))
139 .build()
140 {
141 Ok(pool) => Some(pool),
142 Err(err) if fail_on_pool_error => {
143 return Err(WsiError::Unsupported {
144 reason: format!("failed to initialize JP2K CPU decode pool: {err}"),
145 });
146 }
147 Err(err) => {
148 tracing::error!(
149 error = %err,
150 "failed to initialize default JP2K CPU decode pool; falling back to inline decode"
151 );
152 None
153 }
154 };
155 Ok(Self {
156 options,
157 jp2k_cpu_pool,
158 route_cache: Mutex::new(DecodeRouteCache::new()),
159 })
160 }
161
162 pub(crate) fn default_arc() -> Arc<Self> {
163 static DEFAULT_RUNTIME: OnceLock<Arc<DecodeRuntime>> = OnceLock::new();
164 DEFAULT_RUNTIME
165 .get_or_init(|| {
166 Arc::new(match Self::build(DecodeExecutionOptions::default(), false) {
167 Ok(runtime) => runtime,
168 Err(err) => {
169 tracing::error!(
170 error = %err,
171 "failed to initialize default decode runtime; falling back to inline decode"
172 );
173 Self::inline(DecodeExecutionOptions::default())
174 }
175 })
176 })
177 .clone()
178 }
179
180 fn inline(options: DecodeExecutionOptions) -> Self {
181 Self {
182 options,
183 jp2k_cpu_pool: None,
184 route_cache: Mutex::new(DecodeRouteCache::new()),
185 }
186 }
187
188 pub(crate) fn install_jp2k_cpu<R: Send>(&self, op: impl FnOnce() -> R + Send) -> R {
189 if let Some(pool) = &self.jp2k_cpu_pool {
190 pool.install(op)
191 } else {
192 op()
193 }
194 }
195
196 pub(crate) fn has_jp2k_cpu_pool(&self) -> bool {
197 self.jp2k_cpu_pool.is_some()
198 }
199
200 pub(crate) fn options(&self) -> DecodeExecutionOptions {
201 self.options
202 }
203
204 pub(crate) fn with_current<T>(self: &Arc<Self>, f: impl FnOnce() -> T) -> T {
205 struct Restore(Option<Arc<DecodeRuntime>>);
206 impl Drop for Restore {
207 fn drop(&mut self) {
208 let previous = self.0.take();
209 CURRENT_DECODE_RUNTIME.with(|slot| {
210 *slot.borrow_mut() = previous;
211 });
212 }
213 }
214
215 let previous = CURRENT_DECODE_RUNTIME.with(|slot| slot.replace(Some(self.clone())));
216 let _restore = Restore(previous);
217 f()
218 }
219
220 fn cached_route(&self, key: &DecodeRouteKey) -> Option<DecodeRouteDecision> {
221 self.route_cache
222 .lock()
223 .unwrap_or_else(|err| err.into_inner())
224 .get(key)
225 }
226
227 fn store_route(&self, key: DecodeRouteKey, decision: DecodeRouteDecision) {
228 self.route_cache
229 .lock()
230 .unwrap_or_else(|err| err.into_inner())
231 .insert(key, decision);
232 }
233}
234
235#[derive(Debug)]
236struct DecodeRouteCache {
237 entries: HashMap<DecodeRouteKey, DecodeRouteDecision>,
238 insertion_order: VecDeque<DecodeRouteKey>,
239}
240
241impl DecodeRouteCache {
242 fn new() -> Self {
243 Self {
244 entries: HashMap::new(),
245 insertion_order: VecDeque::new(),
246 }
247 }
248
249 fn get(&self, key: &DecodeRouteKey) -> Option<DecodeRouteDecision> {
250 self.entries.get(key).cloned()
251 }
252
253 fn insert(&mut self, key: DecodeRouteKey, decision: DecodeRouteDecision) {
254 if !self.entries.contains_key(&key) {
255 while self.entries.len() >= ROUTE_CACHE_MAX_ENTRIES {
256 let Some(evicted) = self.insertion_order.pop_front() else {
257 break;
258 };
259 self.entries.remove(&evicted);
260 }
261 self.insertion_order.push_back(key.clone());
262 }
263 self.entries.insert(key, decision);
264 }
265
266 #[cfg(test)]
267 fn len(&self) -> usize {
268 self.entries.len()
269 }
270}
271
272pub(crate) fn current_decode_runtime() -> Option<Arc<DecodeRuntime>> {
273 CURRENT_DECODE_RUNTIME.with(|slot| slot.borrow().clone())
274}
275
276fn default_jp2k_cpu_threads() -> usize {
277 std::thread::available_parallelism()
278 .map_or(1, NonZeroUsize::get)
279 .max(1)
280}
281
282#[derive(Debug, Clone, PartialEq, Eq, Hash)]
283struct DecodeRouteKey {
284 dataset_id: u128,
285 scene: usize,
286 series: usize,
287 level: u32,
288 tile_grid: RouteTileGrid,
289 codec_kind: TileCodecKind,
290 output_backend: OutputBackendRequest,
291 device_backend_identity: String,
292 sample_tile_count: usize,
293}
294
295#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
296struct RouteTileGrid {
297 tile_width: u32,
298 tile_height: u32,
299 tiles_across: u64,
300 tiles_down: u64,
301}
302
303pub(crate) struct AdaptiveDecodeReader {
304 inner: Box<dyn SlideReader>,
305 runtime: Arc<DecodeRuntime>,
306}
307
308impl AdaptiveDecodeReader {
309 pub(crate) fn new(inner: Box<dyn SlideReader>, runtime: Arc<DecodeRuntime>) -> Self {
310 Self { inner, runtime }
311 }
312
313 fn read_tiles_adaptive(
314 &self,
315 reqs: &[TileRequest],
316 output: TileOutputPreference,
317 ) -> Result<Vec<TilePixels>, WsiError> {
318 if !should_adapt_output(&output) {
319 tracing::debug!(
320 requested_tiles = reqs.len(),
321 adaptive_decode = false,
322 "wsi tile batch routed without adaptive decode"
323 );
324 return self
325 .runtime
326 .with_current(|| self.inner.read_tiles(reqs, output));
327 }
328 let route_sample_size = self.runtime.options.route_sample_size();
329 let Some(key) = route_key_for_batch(self.inner.as_ref(), reqs, &output, route_sample_size)
330 else {
331 tracing::debug!(
332 requested_tiles = reqs.len(),
333 route_sample_size,
334 adaptive_decode = true,
335 route_key_available = false,
336 "wsi adaptive decode fell back to requested output"
337 );
338 return self
339 .runtime
340 .with_current(|| self.inner.read_tiles(reqs, output));
341 };
342 if reqs.len() >= DIRECT_DEVICE_BATCH_THRESHOLD {
343 tracing::debug!(
344 requested_tiles = reqs.len(),
345 route_sample_size,
346 direct_device_batch_threshold = DIRECT_DEVICE_BATCH_THRESHOLD,
347 adaptive_decode = true,
348 route_key_available = true,
349 "wsi adaptive decode sent large batch through requested output"
350 );
351 return self
352 .runtime
353 .with_current(|| self.inner.read_tiles(reqs, output));
354 }
355 let (route, measured) = match self.runtime.cached_route(&key) {
356 Some(decision) => {
357 tracing::debug!(
358 requested_tiles = reqs.len(),
359 route_sample_size,
360 route_cache_hit = true,
361 route = ?decision.winner,
362 sample_tile_count = decision.sample_tile_count,
363 cpu_elapsed_ms = decision.cpu_elapsed.as_secs_f64() * 1000.0,
364 device_elapsed_ms = decision.device_elapsed.as_secs_f64() * 1000.0,
365 device_tile_count = decision.device_tile_count,
366 "wsi adaptive decode reused cached route"
367 );
368 (decision.winner, None)
369 }
370 None => {
371 let measured = self.measure_route(reqs, output.clone())?;
372 let winner = measured.decision.winner;
373 tracing::debug!(
374 requested_tiles = reqs.len(),
375 route_sample_size,
376 route_cache_hit = false,
377 route = ?winner,
378 sample_tile_count = measured.decision.sample_tile_count,
379 cpu_elapsed_ms = measured.decision.cpu_elapsed.as_secs_f64() * 1000.0,
380 device_elapsed_ms = measured.decision.device_elapsed.as_secs_f64() * 1000.0,
381 device_tile_count = measured.decision.device_tile_count,
382 "wsi adaptive decode measured route"
383 );
384 self.runtime.store_route(key, measured.decision.clone());
385 (winner, Some(measured.sample_tiles))
386 }
387 };
388 let routed_output = match route {
389 DecodeRoute::Cpu => TileOutputPreference::cpu(),
390 DecodeRoute::Device => output,
391 };
392 if let Some(mut measured) = measured {
393 let sample_len = reqs.len().min(self.runtime.options.route_sample_size());
394 if measured.len() == sample_len {
395 if sample_len == reqs.len() {
396 return Ok(measured);
397 }
398 let mut rest = self
399 .runtime
400 .with_current(|| self.inner.read_tiles(&reqs[sample_len..], routed_output))?;
401 measured.append(&mut rest);
402 return Ok(measured);
403 }
404 }
405 self.runtime
406 .with_current(|| self.inner.read_tiles(reqs, routed_output))
407 }
408
409 fn measure_route(
410 &self,
411 reqs: &[TileRequest],
412 device_output: TileOutputPreference,
413 ) -> Result<MeasuredDecodeRoute, WsiError> {
414 let sample_len = reqs.len().min(self.runtime.options.route_sample_size());
415 let sample = &reqs[..sample_len];
416
417 let device_started = Instant::now();
418 let device_result = self
419 .runtime
420 .with_current(|| self.inner.read_tiles(sample, device_output));
421 let device_elapsed = device_started.elapsed();
422 let device_tile_count = device_result
423 .as_ref()
424 .map(|tiles| {
425 tiles
426 .iter()
427 .filter(|tile| matches!(tile, TilePixels::Device(_)))
428 .count()
429 })
430 .unwrap_or(0);
431 let device_result = match device_result {
432 Ok(device_tiles) if device_tile_count == 0 => {
433 return Ok(MeasuredDecodeRoute {
434 decision: DecodeRouteDecision::measured(
435 device_tiles.len(),
436 device_elapsed,
437 device_elapsed,
438 device_tile_count,
439 ),
440 sample_tiles: device_tiles,
441 });
442 }
443 other => other,
444 };
445
446 let cpu_started = Instant::now();
447 let cpu_tiles = self
448 .runtime
449 .with_current(|| self.inner.read_tiles(sample, TileOutputPreference::cpu()))?;
450 let cpu_elapsed = cpu_started.elapsed();
451
452 let decision = DecodeRouteDecision::measured(
453 cpu_tiles.len(),
454 cpu_elapsed,
455 device_elapsed,
456 device_tile_count,
457 );
458 let sample_tiles = match decision.winner {
459 DecodeRoute::Cpu => cpu_tiles,
460 DecodeRoute::Device => device_result?,
461 };
462
463 Ok(MeasuredDecodeRoute {
464 decision,
465 sample_tiles,
466 })
467 }
468}
469
470impl SlideReader for AdaptiveDecodeReader {
471 fn dataset(&self) -> &Dataset {
472 self.inner.dataset()
473 }
474
475 fn tile_codec_kind(&self, req: &TileRequest) -> TileCodecKind {
476 self.inner.tile_codec_kind(req)
477 }
478
479 fn level_source_kind(
480 &self,
481 scene: crate::core::types::SceneId,
482 series: crate::core::types::SeriesId,
483 level: crate::core::types::LevelIdx,
484 ) -> Result<crate::core::types::LevelSourceKind, WsiError> {
485 self.inner.level_source_kind(scene, series, level)
486 }
487
488 fn read_tiles(
489 &self,
490 reqs: &[TileRequest],
491 output: TileOutputPreference,
492 ) -> Result<Vec<TilePixels>, WsiError> {
493 self.read_tiles_adaptive(reqs, output)
494 }
495
496 fn read_tile_cpu(&self, req: &TileRequest) -> Result<CpuTile, WsiError> {
497 self.runtime.with_current(|| self.inner.read_tile_cpu(req))
498 }
499
500 fn read_raw_compressed_tile(
501 &self,
502 req: &TileRequest,
503 ) -> Result<crate::core::types::RawCompressedTile, WsiError> {
504 self.inner.read_raw_compressed_tile(req)
505 }
506
507 fn read_raw_compressed_display_tile(
508 &self,
509 req: &crate::core::types::TileViewRequest,
510 ) -> Result<crate::core::types::RawCompressedTile, WsiError> {
511 self.inner.read_raw_compressed_display_tile(req)
512 }
513
514 fn read_tiles_cpu(&self, reqs: &[TileRequest]) -> Result<Vec<CpuTile>, WsiError> {
515 self.runtime
516 .with_current(|| self.inner.read_tiles_cpu(reqs))
517 }
518
519 fn use_display_tile_cache(&self, req: &crate::core::types::TileViewRequest) -> bool {
520 self.inner.use_display_tile_cache(req)
521 }
522
523 fn read_region_fastpath(
524 &self,
525 ctx: &mut crate::core::registry::SlideReadContext<'_>,
526 req: &crate::core::types::RegionRequest,
527 ) -> Option<Result<CpuTile, WsiError>> {
528 self.runtime
529 .with_current(|| self.inner.read_region_fastpath(ctx, req))
530 }
531
532 fn read_region(
533 &self,
534 req: &crate::core::types::RegionRequest,
535 output: TileOutputPreference,
536 ) -> Result<TilePixels, WsiError> {
537 self.runtime
538 .with_current(|| self.inner.read_region(req, output))
539 }
540
541 fn read_display_tile(
542 &self,
543 req: &crate::core::types::TileViewRequest,
544 ) -> Result<CpuTile, WsiError> {
545 self.runtime
546 .with_current(|| self.inner.read_display_tile(req))
547 }
548
549 fn associated_image(&self, name: &str) -> Result<Option<CpuTile>, WsiError> {
550 self.inner.associated_image(name)
551 }
552
553 fn read_associated(&self, name: &str) -> Result<CpuTile, WsiError> {
554 self.inner.read_associated(name)
555 }
556
557 fn recommended_shared_cache_bytes(&self) -> Option<u64> {
558 self.inner.recommended_shared_cache_bytes()
559 }
560}
561
562fn should_adapt_output(output: &TileOutputPreference) -> bool {
563 matches!(output, TileOutputPreference::PreferDevice { .. })
564 && output.compressed_device_decode_enabled()
565 && output.adaptive_decode_route_enabled()
566}
567
568fn route_key_for_batch(
569 reader: &dyn SlideReader,
570 reqs: &[TileRequest],
571 output: &TileOutputPreference,
572 route_sample_size: usize,
573) -> Option<DecodeRouteKey> {
574 let first = reqs.first()?;
575 if !reqs.iter().all(|req| {
576 req.scene == first.scene && req.series == first.series && req.level == first.level
577 }) {
578 return None;
579 }
580 let codec_kind = reader.tile_codec_kind(first);
581 if !matches!(codec_kind, TileCodecKind::Jp2k | TileCodecKind::Htj2k) {
582 return None;
583 }
584 if !reqs
585 .iter()
586 .all(|req| reader.tile_codec_kind(req) == codec_kind)
587 {
588 return None;
589 }
590 let level = dataset_level(
591 reader.dataset(),
592 first.scene.get(),
593 first.series.get(),
594 first.level.get(),
595 )?;
596 let tile_grid = route_tile_grid(level)?;
597 Some(DecodeRouteKey {
598 dataset_id: reader.dataset().id.0,
599 scene: first.scene.get(),
600 series: first.series.get(),
601 level: first.level.get(),
602 tile_grid,
603 codec_kind,
604 output_backend: output.backend(),
605 device_backend_identity: device_backend_identity(output),
606 sample_tile_count: reqs.len().min(route_sample_size.max(1)),
607 })
608}
609
610fn dataset_level(dataset: &Dataset, scene: usize, series: usize, level: u32) -> Option<&Level> {
611 dataset
612 .scenes
613 .get(scene)?
614 .series
615 .get(series)?
616 .levels
617 .get(level as usize)
618}
619
620fn route_tile_grid(level: &Level) -> Option<RouteTileGrid> {
621 match &level.tile_layout {
622 TileLayout::Regular {
623 tile_width,
624 tile_height,
625 tiles_across,
626 tiles_down,
627 } => Some(RouteTileGrid {
628 tile_width: *tile_width,
629 tile_height: *tile_height,
630 tiles_across: *tiles_across,
631 tiles_down: *tiles_down,
632 }),
633 _ => None,
634 }
635}
636
637fn device_backend_identity(output: &TileOutputPreference) -> String {
638 #[cfg(feature = "metal")]
639 if let Some(metal) = output.metal_sessions() {
640 return format!("{:?}:{}", output.backend(), metal.device_identity());
641 }
642 #[cfg(feature = "cuda")]
643 if let Some(cuda) = output.cuda_sessions() {
644 return format!("{:?}:{}", output.backend(), cuda.device_identity());
645 }
646 format!("{:?}", output.backend())
647}
648
649#[cfg(test)]
650mod tests {
651 use super::*;
652 use crate::core::types::*;
653 use crate::properties::Properties;
654 use std::collections::HashMap;
655 use std::sync::atomic::{AtomicUsize, Ordering};
656
657 struct CountingAdaptiveSource {
658 dataset: Dataset,
659 batch_reads: Arc<AtomicUsize>,
660 requested_tiles: Arc<AtomicUsize>,
661 }
662
663 impl CountingAdaptiveSource {
664 fn new(batch_reads: Arc<AtomicUsize>, requested_tiles: Arc<AtomicUsize>) -> Self {
665 Self {
666 dataset: Dataset {
667 id: DatasetId::new(42),
668 scenes: vec![Scene {
669 id: "scene".into(),
670 name: None,
671 series: vec![Series {
672 id: "series".into(),
673 axes: AxesShape::default(),
674 levels: vec![Level {
675 dimensions: (128, 128),
676 downsample: 1.0,
677 tile_layout: TileLayout::Regular {
678 tile_width: 128,
679 tile_height: 128,
680 tiles_across: 1,
681 tiles_down: 1,
682 },
683 }],
684 sample_type: SampleType::Uint8,
685 channels: vec![
686 ChannelInfo {
687 name: Some("R".into()),
688 color: None,
689 excitation_nm: None,
690 emission_nm: None,
691 },
692 ChannelInfo {
693 name: Some("G".into()),
694 color: None,
695 excitation_nm: None,
696 emission_nm: None,
697 },
698 ChannelInfo {
699 name: Some("B".into()),
700 color: None,
701 excitation_nm: None,
702 emission_nm: None,
703 },
704 ],
705 }],
706 }],
707 associated_images: HashMap::new(),
708 properties: Properties::new(),
709 icc_profiles: HashMap::new(),
710 source_icc_profiles: Vec::new(),
711 },
712 batch_reads,
713 requested_tiles,
714 }
715 }
716 }
717
718 impl SlideReader for CountingAdaptiveSource {
719 fn dataset(&self) -> &Dataset {
720 &self.dataset
721 }
722
723 fn tile_codec_kind(&self, _req: &TileRequest) -> TileCodecKind {
724 TileCodecKind::Jp2k
725 }
726
727 fn read_tiles(
728 &self,
729 reqs: &[TileRequest],
730 _output: TileOutputPreference,
731 ) -> Result<Vec<TilePixels>, WsiError> {
732 self.batch_reads.fetch_add(1, Ordering::SeqCst);
733 self.requested_tiles.fetch_add(reqs.len(), Ordering::SeqCst);
734 reqs.iter()
735 .map(|req| self.read_tile_cpu(req).map(TilePixels::Cpu))
736 .collect()
737 }
738
739 fn read_tile_cpu(&self, _req: &TileRequest) -> Result<CpuTile, WsiError> {
740 Ok(CpuTile {
741 width: 128,
742 height: 128,
743 channels: 3,
744 color_space: ColorSpace::Rgb,
745 layout: CpuTileLayout::Interleaved,
746 data: CpuTileData::u8(vec![7; 128 * 128 * 3]),
747 })
748 }
749
750 fn read_associated(&self, name: &str) -> Result<CpuTile, WsiError> {
751 Err(WsiError::AssociatedImageNotFound(name.into()))
752 }
753 }
754
755 #[test]
756 fn default_decode_options_reuse_shared_runtime() {
757 let first =
758 DecodeRuntime::arc_for_options(DecodeExecutionOptions::default()).expect("runtime");
759 let second =
760 DecodeRuntime::arc_for_options(DecodeExecutionOptions::default()).expect("runtime");
761
762 assert!(Arc::ptr_eq(&first, &second));
763 }
764
765 #[test]
766 fn route_cache_is_bounded() {
767 let runtime = DecodeRuntime::new(DecodeExecutionOptions::default()).expect("runtime");
768 let first_key = route_key_for_test(0);
769
770 for sequence in 0..ROUTE_CACHE_MAX_ENTRIES + 5 {
771 runtime.store_route(
772 route_key_for_test(sequence),
773 DecodeRouteDecision::measured(
774 1,
775 Duration::from_millis(2),
776 Duration::from_millis(1),
777 1,
778 ),
779 );
780 }
781
782 let cache_len = runtime
783 .route_cache
784 .lock()
785 .unwrap_or_else(|err| err.into_inner())
786 .len();
787 assert_eq!(cache_len, ROUTE_CACHE_MAX_ENTRIES);
788 assert!(runtime.cached_route(&first_key).is_none());
789 assert!(runtime
790 .cached_route(&route_key_for_test(ROUTE_CACHE_MAX_ENTRIES + 4))
791 .is_some());
792 }
793
794 fn route_key_for_test(sequence: usize) -> DecodeRouteKey {
795 DecodeRouteKey {
796 dataset_id: sequence as u128,
797 scene: 0,
798 series: 0,
799 level: 0,
800 tile_grid: RouteTileGrid {
801 tile_width: 128,
802 tile_height: 128,
803 tiles_across: 1,
804 tiles_down: 1,
805 },
806 codec_kind: TileCodecKind::Jp2k,
807 output_backend: OutputBackendRequest::Auto,
808 device_backend_identity: format!("test-{sequence}"),
809 sample_tile_count: 1,
810 }
811 }
812
813 #[test]
814 fn adaptive_route_reuses_device_cpu_fallback_sample_for_first_read() {
815 let batch_reads = Arc::new(AtomicUsize::new(0));
816 let requested_tiles = Arc::new(AtomicUsize::new(0));
817 let runtime = Arc::new(
818 DecodeRuntime::new(DecodeExecutionOptions::default().with_route_sample_size(4))
819 .expect("decode runtime"),
820 );
821 let reader = AdaptiveDecodeReader::new(
822 Box::new(CountingAdaptiveSource::new(
823 batch_reads.clone(),
824 requested_tiles.clone(),
825 )),
826 runtime.clone(),
827 );
828 let req = TileRequest {
829 scene: 0usize.into(),
830 series: 0usize.into(),
831 level: 0u32.into(),
832 plane: PlaneSelection::default().into(),
833 col: 0,
834 row: 0,
835 };
836
837 let tiles = reader
838 .read_tiles(
839 &[req],
840 TileOutputPreference::prefer_device_auto_with_compressed_decode(),
841 )
842 .expect("adaptive read");
843
844 assert_eq!(tiles.len(), 1);
845 assert_eq!(batch_reads.load(Ordering::SeqCst), 1);
846 assert_eq!(requested_tiles.load(Ordering::SeqCst), 1);
847 }
848
849 #[test]
850 fn adaptive_route_keys_subsampled_batches_separately() {
851 let batch_reads = Arc::new(AtomicUsize::new(0));
852 let requested_tiles = Arc::new(AtomicUsize::new(0));
853 let runtime = Arc::new(
854 DecodeRuntime::new(DecodeExecutionOptions::default().with_route_sample_size(4))
855 .expect("decode runtime"),
856 );
857 let reader = AdaptiveDecodeReader::new(
858 Box::new(CountingAdaptiveSource::new(
859 batch_reads.clone(),
860 requested_tiles.clone(),
861 )),
862 runtime.clone(),
863 );
864 let req = TileRequest {
865 scene: 0usize.into(),
866 series: 0usize.into(),
867 level: 0u32.into(),
868 plane: PlaneSelection::default().into(),
869 col: 0,
870 row: 0,
871 };
872 let output = TileOutputPreference::prefer_device_auto_with_compressed_decode();
873 let single_key = route_key_for_batch(
874 reader.inner.as_ref(),
875 std::slice::from_ref(&req),
876 &output,
877 4,
878 )
879 .expect("route key is available for one-tile JP2K regular batch");
880 let full_sample_key = route_key_for_batch(
881 reader.inner.as_ref(),
882 &[req.clone(), req.clone(), req.clone(), req.clone()],
883 &output,
884 4,
885 )
886 .expect("route key is available for JP2K regular tile");
887
888 let tiles = reader.read_tiles(&[req], output).expect("adaptive read");
889
890 assert_eq!(tiles.len(), 1);
891 assert_eq!(batch_reads.load(Ordering::SeqCst), 1);
892 assert_eq!(requested_tiles.load(Ordering::SeqCst), 1);
893 assert!(
894 runtime.cached_route(&single_key).is_some(),
895 "a one-tile read should cache its own route"
896 );
897 assert!(
898 runtime.cached_route(&full_sample_key).is_none(),
899 "a one-tile read must not poison the route for four-plus-tile batches"
900 );
901 }
902
903 #[test]
904 fn default_route_sample_covers_viewer_sized_dicom_device_batches() {
905 let batch_reads = Arc::new(AtomicUsize::new(0));
906 let requested_tiles = Arc::new(AtomicUsize::new(0));
907 let reader = CountingAdaptiveSource::new(batch_reads, requested_tiles);
908 let reqs = (0..15)
909 .map(|col| TileRequest {
910 scene: 0usize.into(),
911 series: 0usize.into(),
912 level: 0u32.into(),
913 plane: PlaneSelection::default().into(),
914 col: col as i64,
915 row: 0,
916 })
917 .collect::<Vec<_>>();
918 let output = TileOutputPreference::prefer_device_auto_with_compressed_decode();
919
920 let key = route_key_for_batch(
921 &reader,
922 &reqs,
923 &output,
924 DecodeExecutionOptions::default().route_sample_size(),
925 )
926 .expect("route key is available for a viewer-sized JP2K batch");
927
928 assert_eq!(
929 key.sample_tile_count, 15,
930 "default adaptive sampling must measure a real visible-tile batch instead of undersampling into the CPU path"
931 );
932 }
933
934 #[test]
935 fn adaptive_route_sends_large_jp2k_batches_to_device_preferred_reader_without_sampling() {
936 let batch_reads = Arc::new(AtomicUsize::new(0));
937 let requested_tiles = Arc::new(AtomicUsize::new(0));
938 let runtime = Arc::new(DecodeRuntime::new(DecodeExecutionOptions::default()).unwrap());
939 let reader = AdaptiveDecodeReader::new(
940 Box::new(CountingAdaptiveSource::new(
941 batch_reads.clone(),
942 requested_tiles.clone(),
943 )),
944 runtime.clone(),
945 );
946 let reqs = (0..15)
947 .map(|col| TileRequest {
948 scene: 0usize.into(),
949 series: 0usize.into(),
950 level: 0u32.into(),
951 plane: PlaneSelection::default().into(),
952 col: col as i64,
953 row: 0,
954 })
955 .collect::<Vec<_>>();
956 let output = TileOutputPreference::prefer_device_auto_with_compressed_decode();
957 let key = route_key_for_batch(
958 reader.inner.as_ref(),
959 &reqs,
960 &output,
961 DecodeExecutionOptions::default().route_sample_size(),
962 )
963 .expect("route key is available for a viewer-sized JP2K batch");
964
965 let tiles = reader.read_tiles(&reqs, output).expect("adaptive read");
966
967 assert_eq!(tiles.len(), 15);
968 assert_eq!(
969 batch_reads.load(Ordering::SeqCst),
970 1,
971 "large JP2K batches should avoid cold adaptive double-decode"
972 );
973 assert_eq!(requested_tiles.load(Ordering::SeqCst), 15);
974 assert!(
975 runtime.cached_route(&key).is_none(),
976 "direct large-batch routing should not cache a CPU-biased sample"
977 );
978 }
979
980 #[test]
981 fn adaptive_route_samples_uncached_subthreshold_batches_before_routing_remainder() {
982 let batch_reads = Arc::new(AtomicUsize::new(0));
983 let requested_tiles = Arc::new(AtomicUsize::new(0));
984 let runtime = Arc::new(
985 DecodeRuntime::new(DecodeExecutionOptions::default().with_route_sample_size(4))
986 .expect("decode runtime"),
987 );
988 let reader = AdaptiveDecodeReader::new(
989 Box::new(CountingAdaptiveSource::new(
990 batch_reads.clone(),
991 requested_tiles.clone(),
992 )),
993 runtime.clone(),
994 );
995 let reqs = (0..7)
996 .map(|col| TileRequest {
997 scene: 0usize.into(),
998 series: 0usize.into(),
999 level: 0u32.into(),
1000 plane: PlaneSelection::default().into(),
1001 col: col as i64,
1002 row: 0,
1003 })
1004 .collect::<Vec<_>>();
1005 let output = TileOutputPreference::prefer_device_auto_with_compressed_decode();
1006 let key = route_key_for_batch(reader.inner.as_ref(), &reqs, &output, 4)
1007 .expect("route key is available for JP2K regular tile");
1008
1009 let tiles = reader
1010 .read_tiles(&reqs, output.clone())
1011 .expect("adaptive read");
1012
1013 assert_eq!(tiles.len(), 7);
1014 assert_eq!(
1015 batch_reads.load(Ordering::SeqCst),
1016 2,
1017 "uncached subthreshold batches should sample, cache a route, then route the remainder"
1018 );
1019 assert_eq!(requested_tiles.load(Ordering::SeqCst), 7);
1020 assert!(
1021 runtime.cached_route(&key).is_some(),
1022 "subthreshold auto routing should cache a measured route"
1023 );
1024
1025 batch_reads.store(0, Ordering::SeqCst);
1026 requested_tiles.store(0, Ordering::SeqCst);
1027 let tiles = reader
1028 .read_tiles(&reqs, output)
1029 .expect("cached adaptive read");
1030
1031 assert_eq!(tiles.len(), 7);
1032 assert_eq!(
1033 batch_reads.load(Ordering::SeqCst),
1034 1,
1035 "cached large-batch routes should not resample"
1036 );
1037 assert_eq!(requested_tiles.load(Ordering::SeqCst), 7);
1038 }
1039}