use std::sync::Arc;
use crate::dedupe::DedupeStrategy;
use crate::engine::{
BlankTileStrategy, EngineConfig, EngineError, EngineResult, generate_pyramid_observed,
};
use crate::extensions::Extensions;
use crate::observe::{EngineObserver, NoopObserver};
use crate::planner::PyramidPlan;
use crate::raster::Raster;
use crate::resume::{ResumeMode, ResumePolicy};
use crate::retry::{FailurePolicy, RetryPolicy};
use crate::sink::TileSink;
use crate::streaming::{
BudgetPolicy, RasterStripSource, StreamingConfig, StripSource, generate_pyramid_streaming,
};
use crate::streaming_mapreduce::{MapReduceConfig, generate_pyramid_mapreduce};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
#[non_exhaustive]
pub enum EngineKind {
#[default]
Auto,
Monolithic,
Streaming,
MapReduce,
}
pub enum EngineSource<'a> {
Raster(&'a Raster),
Strip(Box<dyn StripSource + 'a>),
}
impl<'a> std::fmt::Debug for EngineSource<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Raster(_) => f.debug_tuple("EngineSource::Raster").finish(),
Self::Strip(_) => f.debug_tuple("EngineSource::Strip").finish(),
}
}
}
pub trait IntoEngineSource<'a> {
fn into_engine_source(self) -> EngineSource<'a>;
}
impl<'a> IntoEngineSource<'a> for &'a Raster {
fn into_engine_source(self) -> EngineSource<'a> {
EngineSource::Raster(self)
}
}
impl<'a, T> IntoEngineSource<'a> for T
where
T: StripSource + 'a,
{
fn into_engine_source(self) -> EngineSource<'a> {
EngineSource::Strip(Box::new(self))
}
}
pub struct EngineBuilder<'a, S: TileSink> {
source: EngineSource<'a>,
plan: PyramidPlan,
sink: S,
engine_kind: EngineKind,
observer: Option<Arc<dyn EngineObserver>>,
concurrency: Option<usize>,
buffer_size: Option<usize>,
background_rgb: Option<[u8; 3]>,
blank_strategy: Option<BlankTileStrategy>,
failure_policy: Option<FailurePolicy>,
dedupe: Option<DedupeStrategy>,
resume: Option<ResumePolicy>,
memory_budget_bytes: Option<u64>,
budget_policy: Option<BudgetPolicy>,
extensions: Extensions,
}
impl<'a, S: TileSink> std::fmt::Debug for EngineBuilder<'a, S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EngineBuilder")
.field("source", &self.source)
.field("engine_kind", &self.engine_kind)
.field("concurrency", &self.concurrency)
.field("buffer_size", &self.buffer_size)
.field("background_rgb", &self.background_rgb)
.field("blank_strategy", &self.blank_strategy)
.field("failure_policy", &self.failure_policy)
.field("dedupe", &self.dedupe)
.field("resume", &self.resume)
.field("memory_budget_bytes", &self.memory_budget_bytes)
.field("budget_policy", &self.budget_policy)
.field("extensions", &self.extensions)
.finish_non_exhaustive()
}
}
impl<'a, S: TileSink> EngineBuilder<'a, S> {
pub fn new(source: impl IntoEngineSource<'a>, plan: PyramidPlan, sink: S) -> Self {
Self {
source: source.into_engine_source(),
plan,
sink,
engine_kind: EngineKind::Auto,
observer: None,
concurrency: None,
buffer_size: None,
background_rgb: None,
blank_strategy: None,
failure_policy: None,
dedupe: None,
resume: None,
memory_budget_bytes: None,
budget_policy: None,
extensions: Extensions::new(),
}
}
pub fn with_observer(mut self, observer: impl EngineObserver + 'static) -> Self {
self.observer = Some(Arc::new(observer));
self
}
pub fn with_observer_arc(mut self, observer: Arc<dyn EngineObserver>) -> Self {
self.observer = Some(observer);
self
}
pub fn with_engine(mut self, kind: EngineKind) -> Self {
self.engine_kind = kind;
self
}
pub fn with_config(mut self, config: EngineConfig) -> Self {
self.concurrency = Some(config.concurrency);
self.buffer_size = Some(config.buffer_size);
self.background_rgb = Some(config.background_rgb);
self.blank_strategy = Some(config.blank_tile_strategy);
self.failure_policy = Some(config.failure_policy);
if let Some(ds) = config.dedupe_strategy {
self.dedupe = Some(ds);
}
if config.checkpoint_every != 0 || config.checkpoint_root.is_some() {
let mut policy = self.resume.unwrap_or_else(ResumePolicy::overwrite);
if config.checkpoint_every != 0 && policy.checkpoint_every() == 0 {
policy = policy.with_checkpoint_every(config.checkpoint_every);
}
if policy.checkpoint_root().is_none() {
if let Some(root) = config.checkpoint_root {
policy = policy.with_checkpoint_root(root);
}
}
self.resume = Some(policy);
}
self
}
pub fn with_failure_policy(mut self, policy: FailurePolicy) -> Self {
self.failure_policy = Some(policy);
self
}
pub fn with_retry(self, policy: RetryPolicy) -> Self {
self.with_failure_policy(FailurePolicy::RetryThenFail(policy))
}
pub fn with_resume(mut self, policy: ResumePolicy) -> Self {
self.resume = Some(policy);
self
}
pub fn with_dedupe(mut self, strategy: DedupeStrategy) -> Self {
self.dedupe = Some(strategy);
self
}
pub fn with_blank_strategy(mut self, strategy: BlankTileStrategy) -> Self {
self.blank_strategy = Some(strategy);
self
}
pub fn with_background_rgb(mut self, rgb: [u8; 3]) -> Self {
self.background_rgb = Some(rgb);
self
}
pub fn with_concurrency(mut self, n: usize) -> Self {
self.concurrency = Some(n);
self
}
pub fn with_buffer_size(mut self, n: usize) -> Self {
self.buffer_size = Some(n);
self
}
pub fn with_memory_budget(mut self, bytes: u64) -> Self {
self.memory_budget_bytes = Some(bytes);
self
}
pub fn with_budget_policy(mut self, policy: BudgetPolicy) -> Self {
self.budget_policy = Some(policy);
self
}
pub fn with_extension<T: Send + Sync + 'static>(mut self, value: T) -> Self {
self.extensions.insert(value);
self
}
pub fn extension<T: Send + Sync + 'static>(&self) -> Option<&T> {
self.extensions.get::<T>()
}
pub fn extensions(&self) -> &Extensions {
&self.extensions
}
pub fn run(self) -> Result<EngineResult, EngineError> {
let (result, _sink) = self.run_collect()?;
Ok(result)
}
pub fn run_collect(self) -> Result<(EngineResult, S), EngineError> {
let EngineBuilder {
source,
plan,
sink,
engine_kind,
observer,
concurrency,
buffer_size,
background_rgb,
blank_strategy,
failure_policy,
dedupe,
resume,
memory_budget_bytes,
budget_policy,
extensions: _extensions, } = self;
let mut engine_cfg = build_engine_config(
concurrency,
buffer_size,
background_rgb,
blank_strategy,
failure_policy,
dedupe,
);
let observer_ref: &dyn EngineObserver = match &observer {
Some(arc) => arc.as_ref(),
None => &NoopObserver,
};
let kind = resolve_engine_kind(engine_kind, &source);
if let Some(policy) = resume {
if policy.checkpoint_every() > 0 {
engine_cfg = engine_cfg.with_checkpoint_every(policy.checkpoint_every());
}
if let Some(root) = policy.checkpoint_root() {
engine_cfg = engine_cfg.with_checkpoint_root(root.to_path_buf());
}
if matches!(kind, EngineKind::Monolithic) && matches!(source, EngineSource::Strip(_)) {
return Err(EngineError::IncompatibleSource {
kind: EngineKind::Monolithic,
reason: "Monolithic engine requires an in-memory Raster source",
});
}
if matches!(policy.mode(), ResumeMode::Verify) {
let result = match (kind, source) {
(EngineKind::Monolithic, EngineSource::Raster(raster)) => {
crate::verify::raster_verify(
raster,
&plan,
&sink,
&engine_cfg,
observer_ref,
)?
}
(EngineKind::Streaming, EngineSource::Raster(raster))
| (EngineKind::MapReduce, EngineSource::Raster(raster)) => {
let strip = RasterStripSource::new(raster);
crate::verify::verify_from_strip_source(
&strip,
&plan,
&sink,
&engine_cfg,
observer_ref,
)?
}
(EngineKind::Streaming, EngineSource::Strip(strip))
| (EngineKind::MapReduce, EngineSource::Strip(strip)) => {
crate::verify::verify_from_strip_source(
strip.as_ref(),
&plan,
&sink,
&engine_cfg,
observer_ref,
)?
}
(EngineKind::Monolithic, EngineSource::Strip(_)) => {
unreachable!("Monolithic + Strip rejected above")
}
(EngineKind::Auto, _) => {
unreachable!("Auto should have been resolved before match")
}
};
return Ok((result, sink));
}
let (skip, cp) = prepare_resume_state(&sink, &plan, &engine_cfg, policy.mode())?;
let wrapped = resume::ResumeAwareSink::new(&sink, &skip, cp.as_ref());
let mut result = match (kind, source) {
(EngineKind::Monolithic, EngineSource::Raster(raster)) => {
generate_pyramid_observed(raster, &plan, &wrapped, &engine_cfg, observer_ref)?
}
(EngineKind::Streaming, EngineSource::Raster(raster)) => {
let strip = RasterStripSource::new(raster);
let cfg =
build_streaming_config(engine_cfg, memory_budget_bytes, budget_policy);
generate_pyramid_streaming(&strip, &plan, &wrapped, &cfg, observer_ref)?
}
(EngineKind::Streaming, EngineSource::Strip(strip)) => {
let cfg =
build_streaming_config(engine_cfg, memory_budget_bytes, budget_policy);
generate_pyramid_streaming(strip.as_ref(), &plan, &wrapped, &cfg, observer_ref)?
}
(EngineKind::MapReduce, EngineSource::Raster(raster)) => {
let strip = RasterStripSource::new(raster);
let cfg = build_mapreduce_config(
memory_budget_bytes,
concurrency,
buffer_size,
background_rgb,
blank_strategy,
);
generate_pyramid_mapreduce(&strip, &plan, &wrapped, &cfg, observer_ref)?
}
(EngineKind::MapReduce, EngineSource::Strip(strip)) => {
let cfg = build_mapreduce_config(
memory_budget_bytes,
concurrency,
buffer_size,
background_rgb,
blank_strategy,
);
generate_pyramid_mapreduce(strip.as_ref(), &plan, &wrapped, &cfg, observer_ref)?
}
(EngineKind::Monolithic, EngineSource::Strip(_)) => {
unreachable!("Monolithic + Strip rejected above")
}
(EngineKind::Auto, _) => {
unreachable!("Auto should have been resolved before match")
}
};
let skipped = wrapped.skipped_count();
if skipped > 0 {
result.tiles_produced = result.tiles_produced.saturating_sub(skipped);
}
if let Some(cp) = cp.as_ref() {
cp.flush().map_err(EngineError::ResumeFailed)?;
}
return Ok((result, sink));
}
let result = match (kind, source) {
(EngineKind::Monolithic, EngineSource::Raster(raster)) => {
generate_pyramid_observed(raster, &plan, &sink, &engine_cfg, observer_ref)?
}
(EngineKind::Monolithic, EngineSource::Strip(_)) => {
return Err(EngineError::IncompatibleSource {
kind: EngineKind::Monolithic,
reason: "Monolithic engine requires an in-memory Raster source",
});
}
(EngineKind::Streaming, EngineSource::Raster(raster)) => {
let source = RasterStripSource::new(raster);
let cfg = build_streaming_config(engine_cfg, memory_budget_bytes, budget_policy);
generate_pyramid_streaming(&source, &plan, &sink, &cfg, observer_ref)?
}
(EngineKind::Streaming, EngineSource::Strip(source)) => {
let cfg = build_streaming_config(engine_cfg, memory_budget_bytes, budget_policy);
generate_pyramid_streaming(source.as_ref(), &plan, &sink, &cfg, observer_ref)?
}
(EngineKind::MapReduce, EngineSource::Raster(raster)) => {
let source = RasterStripSource::new(raster);
let cfg = build_mapreduce_config(
memory_budget_bytes,
concurrency,
buffer_size,
background_rgb,
blank_strategy,
);
generate_pyramid_mapreduce(&source, &plan, &sink, &cfg, observer_ref)?
}
(EngineKind::MapReduce, EngineSource::Strip(source)) => {
let cfg = build_mapreduce_config(
memory_budget_bytes,
concurrency,
buffer_size,
background_rgb,
blank_strategy,
);
generate_pyramid_mapreduce(source.as_ref(), &plan, &sink, &cfg, observer_ref)?
}
(EngineKind::Auto, _) => {
unreachable!("Auto should have been resolved before match")
}
};
Ok((result, sink))
}
}
fn build_engine_config(
concurrency: Option<usize>,
buffer_size: Option<usize>,
background_rgb: Option<[u8; 3]>,
blank_strategy: Option<BlankTileStrategy>,
failure_policy: Option<FailurePolicy>,
dedupe: Option<DedupeStrategy>,
) -> EngineConfig {
let mut cfg = EngineConfig::default();
if let Some(n) = concurrency {
cfg = cfg.with_concurrency(n);
}
if let Some(n) = buffer_size {
cfg = cfg.with_buffer_size(n);
}
if let Some(rgb) = background_rgb {
cfg.background_rgb = rgb;
}
if let Some(bts) = blank_strategy {
cfg = cfg.with_blank_tile_strategy(bts);
}
if let Some(fp) = failure_policy {
cfg = cfg.with_failure_policy(fp);
}
if let Some(ds) = dedupe {
cfg = cfg.with_dedupe_strategy(ds);
}
cfg
}
fn build_streaming_config(
engine: EngineConfig,
memory_budget_bytes: Option<u64>,
budget_policy: Option<BudgetPolicy>,
) -> StreamingConfig {
let defaults = StreamingConfig::default();
StreamingConfig {
engine,
memory_budget_bytes: memory_budget_bytes.unwrap_or(defaults.memory_budget_bytes),
budget_policy: budget_policy.unwrap_or(defaults.budget_policy),
}
}
fn build_mapreduce_config(
memory_budget_bytes: Option<u64>,
concurrency: Option<usize>,
buffer_size: Option<usize>,
background_rgb: Option<[u8; 3]>,
blank_strategy: Option<BlankTileStrategy>,
) -> MapReduceConfig {
let mut cfg = MapReduceConfig::default();
if let Some(b) = memory_budget_bytes {
cfg.memory_budget_bytes = b;
}
if let Some(n) = concurrency {
cfg.tile_concurrency = n;
}
if let Some(n) = buffer_size {
cfg.buffer_size = n;
}
if let Some(rgb) = background_rgb {
cfg.background_rgb = rgb;
}
if let Some(bts) = blank_strategy {
cfg.blank_tile_strategy = bts;
}
cfg
}
fn resolve_engine_kind(kind: EngineKind, source: &EngineSource<'_>) -> EngineKind {
match (kind, source) {
(EngineKind::Auto, EngineSource::Raster(_)) => EngineKind::Monolithic,
(EngineKind::Auto, EngineSource::Strip(_)) => EngineKind::Streaming,
(k, _) => k,
}
}
fn prepare_resume_state(
sink: &dyn TileSink,
plan: &PyramidPlan,
config: &EngineConfig,
mode: ResumeMode,
) -> Result<
(
std::collections::HashSet<crate::planner::TileCoord>,
Option<crate::engine::CheckpointState>,
),
EngineError,
> {
match mode {
ResumeMode::Overwrite => {
if let Some(root) = crate::engine::resolve_checkpoint_root(config, sink) {
crate::engine::wipe_directory(&root)
.map_err(|e| EngineError::ResumeFailed(crate::resume::ResumeError::from(e)))?;
}
let cp = crate::engine::cp_for_sink(sink, plan, config, Vec::new(), Vec::new());
Ok((std::collections::HashSet::new(), cp))
}
ResumeMode::Resume => {
let expected_hash = crate::resume::compute_plan_hash(plan);
let (completed, levels) =
if let Some(root) = crate::engine::resolve_checkpoint_root(config, sink) {
match crate::resume::JobCheckpoint::load(&root)? {
Some(meta) => {
if meta.plan_hash != expected_hash {
return Err(EngineError::PlanHashMismatch {
expected: meta.plan_hash.clone(),
got: expected_hash,
});
}
(meta.completed_tiles, meta.levels_completed)
}
None => (Vec::new(), Vec::new()),
}
} else {
(Vec::new(), Vec::new())
};
let skip: std::collections::HashSet<crate::planner::TileCoord> =
completed.iter().copied().collect();
let cp = crate::engine::cp_for_sink(sink, plan, config, completed, levels);
Ok((skip, cp))
}
ResumeMode::Verify => unreachable!("Verify is rejected above for non-Monolithic engines"),
}
}
mod resume {
use std::collections::HashSet;
use std::path::Path;
use std::sync::atomic::{AtomicU64, Ordering};
use crate::engine::{CheckpointState, EngineConfig};
use crate::planner::TileCoord;
use crate::sink::{SinkError, Tile, TileSink};
pub(super) struct ResumeAwareSink<'a, S: TileSink + ?Sized> {
pub(super) inner: &'a S,
pub(super) skip: &'a HashSet<TileCoord>,
pub(super) cp: Option<&'a CheckpointState>,
pub(super) skipped: AtomicU64,
}
impl<'a, S: TileSink + ?Sized> ResumeAwareSink<'a, S> {
pub(super) fn new(
inner: &'a S,
skip: &'a HashSet<TileCoord>,
cp: Option<&'a CheckpointState>,
) -> Self {
Self {
inner,
skip,
cp,
skipped: AtomicU64::new(0),
}
}
pub(super) fn skipped_count(&self) -> u64 {
self.skipped.load(Ordering::Relaxed)
}
}
impl<'a, S: TileSink + ?Sized> TileSink for ResumeAwareSink<'a, S> {
fn write_tile(&self, tile: &Tile) -> Result<(), SinkError> {
if self.skip.contains(&tile.coord) {
self.skipped.fetch_add(1, Ordering::Relaxed);
return Ok(());
}
self.inner.write_tile(tile)?;
if let Some(cp) = self.cp {
cp.mark_tile_completed(tile.coord)
.map_err(|e| SinkError::Other(format!("checkpoint: {e}")))?;
}
Ok(())
}
fn finish(&self) -> Result<(), SinkError> {
self.inner.finish()
}
fn record_engine_config(&self, config: &EngineConfig) {
self.inner.record_engine_config(config)
}
fn sink_retry_count(&self) -> u64 {
self.inner.sink_retry_count()
}
fn sink_skipped_due_to_failure(&self) -> u64 {
self.inner.sink_skipped_due_to_failure()
}
fn note_sink_skipped(&self) {
self.inner.note_sink_skipped()
}
fn checkpoint_root(&self) -> Option<&Path> {
self.inner.checkpoint_root()
}
fn init_level_count(&self, levels: usize) {
self.inner.init_level_count(levels)
}
}
#[allow(dead_code)]
fn _unused_marker(_: AtomicU64) {}
}