use crate::PipelineConfig;
use crate::octree::{OctreeBuilder, ScanResult, point_to_key};
use crate::validate::ValidatedInputs;
use anyhow::{Context, Result};
use rayon::prelude::*;
use std::path::PathBuf;
use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
use tracing::{debug, info};
const SENTINEL: i64 = -1;
const SAFETY_FACTOR: f64 = 0.6;
const PER_POINT_OVERHEAD_BYTES: u64 = 170;
const MIN_CHUNK_POINTS: u64 = 1_000_000;
const PER_CHUNK_PEAK_BYTES_PER_POINT: u64 = 600;
const SINGLE_CHUNK_BUDGET_FRACTION: f64 = 0.4;
const PARALLELISM_TARGET_PER_WORKER: u64 = 4;
pub(crate) fn max_chunk_points(memory_budget: u64) -> u64 {
let raw = ((memory_budget as f64 * SINGLE_CHUNK_BUDGET_FRACTION)
/ PER_CHUNK_PEAK_BYTES_PER_POINT as f64) as u64;
raw.max(MIN_CHUNK_POINTS)
}
#[derive(Debug, Clone, Copy)]
pub struct PlannedChunk {
pub level: u32,
pub gx: i32,
pub gy: i32,
pub gz: i32,
pub point_count: u64,
}
#[derive(Debug)]
pub struct ChunkPlan {
pub grid_size: u32,
pub grid_depth: u32,
pub chunk_target: u64,
pub total_points: u64,
pub chunks: Vec<PlannedChunk>,
pub header_mismatch: Option<HeaderBoundsMismatch>,
}
#[derive(Debug, Clone, Copy)]
pub struct HeaderBoundsMismatch {
pub min_x: Option<(f64, f64)>,
pub max_x: Option<(f64, f64)>,
pub min_y: Option<(f64, f64)>,
pub max_y: Option<(f64, f64)>,
pub min_z: Option<(f64, f64)>,
pub max_z: Option<(f64, f64)>,
}
impl HeaderBoundsMismatch {
fn any(&self) -> bool {
self.min_x.is_some()
|| self.max_x.is_some()
|| self.min_y.is_some()
|| self.max_y.is_some()
|| self.min_z.is_some()
|| self.max_z.is_some()
}
}
impl ChunkPlan {
pub fn len(&self) -> usize {
self.chunks.len()
}
pub fn is_empty(&self) -> bool {
self.chunks.is_empty()
}
}
pub fn compute_chunk_target(memory_budget: u64, num_workers: usize, total_points: u64) -> u64 {
let workers = num_workers.max(1) as u64;
let raw = ((memory_budget as f64 * SAFETY_FACTOR) / (workers * PER_POINT_OVERHEAD_BYTES) as f64)
as u64;
let parallelism_target = workers * PARALLELISM_TARGET_PER_WORKER;
let max_for_parallelism = if parallelism_target > 0 {
(total_points / parallelism_target).max(MIN_CHUNK_POINTS)
} else {
u64::MAX
};
let max_chunk = max_chunk_points(memory_budget);
raw.min(max_for_parallelism)
.clamp(MIN_CHUNK_POINTS, max_chunk)
}
pub fn select_grid_size(total_points: u64, memory_budget: u64) -> u32 {
let preferred = if total_points < 100_000_000 {
128
} else if total_points < 500_000_000 {
256
} else {
512
};
const GRID_BUDGET_FRACTION_BP: u64 = 500; let budget_bytes = memory_budget.saturating_mul(GRID_BUDGET_FRACTION_BP) / 10_000;
let max_cells = budget_bytes / 4;
let mut allowed = 128u32;
for candidate in [512u32, 256, 128] {
let cells = (candidate as u64).pow(3);
if cells <= max_cells && candidate <= preferred {
allowed = candidate;
break;
}
}
allowed
}
pub fn analyze_chunking(
input_files: &[PathBuf],
scan_results: &[ScanResult],
validated: &ValidatedInputs,
config: &PipelineConfig,
chunk_target_override: Option<u64>,
) -> Result<ChunkPlan> {
let builder = OctreeBuilder::from_scan(scan_results, validated, config)
.context("constructing OctreeBuilder for chunk analysis")?;
config.report(crate::ProgressEvent::StageStart {
name: "Counting",
total: builder.total_points,
});
let plan = compute_chunk_plan(&builder, input_files, config, chunk_target_override)?;
config.report(crate::ProgressEvent::StageDone);
Ok(plan)
}
pub(crate) fn compute_chunk_plan(
builder: &OctreeBuilder,
input_files: &[PathBuf],
config: &PipelineConfig,
chunk_target_override: Option<u64>,
) -> Result<ChunkPlan> {
let total_points = builder.total_points;
let grid_size = select_grid_size(total_points, config.memory_budget);
let grid_depth = grid_size.trailing_zeros();
let num_workers = distribute_worker_count(input_files.len());
let chunk_target = chunk_target_override
.unwrap_or_else(|| compute_chunk_target(config.memory_budget, num_workers, total_points));
info!(
"Chunk plan: total_points={}, grid={}³ (depth {}), chunk_target={}, workers={}",
total_points, grid_size, grid_depth, chunk_target, num_workers
);
let (grid, actual) = count_points(builder, input_files, grid_size, grid_depth, config)?;
let cells_touched = grid
.iter()
.filter(|c| c.load(Ordering::Relaxed) > 0)
.count();
debug!(
"Count pass: {} of {} cells touched ({:.2}%)",
cells_touched,
grid.len(),
cells_touched as f64 / grid.len() as f64 * 100.0
);
let header_mismatch = detect_header_mismatch(builder, &actual);
let chunks = merge_sparse_cells(&grid, grid_size, grid_depth, chunk_target);
Ok(ChunkPlan {
grid_size,
grid_depth,
chunk_target,
total_points,
chunks,
header_mismatch,
})
}
fn detect_header_mismatch(
builder: &OctreeBuilder,
actual: &ActualBounds,
) -> Option<HeaderBoundsMismatch> {
let flag = |hdr: f64, act: f64, tol: f64| -> Option<(f64, f64)> {
((hdr - act).abs() > tol).then_some((hdr, act))
};
let hdr = &builder.bounds;
let result = HeaderBoundsMismatch {
min_x: flag(hdr.min_x, actual.min_x, builder.scale_x),
max_x: flag(hdr.max_x, actual.max_x, builder.scale_x),
min_y: flag(hdr.min_y, actual.min_y, builder.scale_y),
max_y: flag(hdr.max_y, actual.max_y, builder.scale_y),
min_z: flag(hdr.min_z, actual.min_z, builder.scale_z),
max_z: flag(hdr.max_z, actual.max_z, builder.scale_z),
};
result.any().then_some(result)
}
#[derive(Debug, Clone, Copy)]
pub(crate) struct ActualBounds {
pub min_x: f64,
pub min_y: f64,
pub min_z: f64,
pub max_x: f64,
pub max_y: f64,
pub max_z: f64,
}
impl ActualBounds {
fn empty() -> Self {
Self {
min_x: f64::INFINITY,
min_y: f64::INFINITY,
min_z: f64::INFINITY,
max_x: f64::NEG_INFINITY,
max_y: f64::NEG_INFINITY,
max_z: f64::NEG_INFINITY,
}
}
fn merge(&mut self, other: &ActualBounds) {
self.min_x = self.min_x.min(other.min_x);
self.min_y = self.min_y.min(other.min_y);
self.min_z = self.min_z.min(other.min_z);
self.max_x = self.max_x.max(other.max_x);
self.max_y = self.max_y.max(other.max_y);
self.max_z = self.max_z.max(other.max_z);
}
}
fn count_points(
builder: &OctreeBuilder,
input_files: &[PathBuf],
grid_size: u32,
grid_depth: u32,
config: &PipelineConfig,
) -> Result<(Box<[AtomicU32]>, ActualBounds)> {
let g = grid_size as usize;
let n_cells = g * g * g;
debug!(
"Allocating counting grid: {}³ = {} cells, {} MB",
grid_size,
n_cells,
n_cells * 4 / 1_048_576
);
let grid: Box<[AtomicU32]> = {
let zeros: Vec<u32> = vec![0u32; n_cells];
let boxed: Box<[u32]> = zeros.into_boxed_slice();
let ptr = Box::into_raw(boxed) as *mut [AtomicU32];
unsafe { Box::from_raw(ptr) }
};
let num_workers = distribute_worker_count(input_files.len());
let files_per_worker = input_files.len().div_ceil(num_workers);
let progress = AtomicU64::new(0);
const BYTES_PER_LAS_POINT: u64 = 120;
let per_worker_budget = config.memory_budget / num_workers as u64;
let read_chunk_size =
((per_worker_budget / 8) / BYTES_PER_LAS_POINT).clamp(50_000, 2_000_000) as usize;
debug!(
"Counting with {} workers, {} files per worker, read_chunk_size={}",
num_workers, files_per_worker, read_chunk_size
);
let per_worker_bounds: Vec<ActualBounds> = (0..num_workers)
.into_par_iter()
.map(|worker_id| -> Result<ActualBounds> {
let mut local_bounds = ActualBounds::empty();
let start = worker_id * files_per_worker;
let end = (start + files_per_worker).min(input_files.len());
if start >= end {
return Ok(local_bounds);
}
let mut points: Vec<las::Point> = Vec::with_capacity(read_chunk_size);
for path in &input_files[start..end] {
let mut reader = las::Reader::from_path(path)
.with_context(|| format!("Cannot open {:?}", path))?;
loop {
points.clear();
let n = reader.read_points_into(read_chunk_size as u64, &mut points)?;
if n == 0 {
break;
}
for p in &points {
let ix = ((p.x - builder.offset_x) / builder.scale_x).round() as i32;
let iy = ((p.y - builder.offset_y) / builder.scale_y).round() as i32;
let iz = ((p.z - builder.offset_z) / builder.scale_z).round() as i32;
let wx = ix as f64 * builder.scale_x + builder.offset_x;
let wy = iy as f64 * builder.scale_y + builder.offset_y;
let wz = iz as f64 * builder.scale_z + builder.offset_z;
if wx < local_bounds.min_x {
local_bounds.min_x = wx;
}
if wy < local_bounds.min_y {
local_bounds.min_y = wy;
}
if wz < local_bounds.min_z {
local_bounds.min_z = wz;
}
if wx > local_bounds.max_x {
local_bounds.max_x = wx;
}
if wy > local_bounds.max_y {
local_bounds.max_y = wy;
}
if wz > local_bounds.max_z {
local_bounds.max_z = wz;
}
let key = point_to_key(
wx,
wy,
wz,
builder.cx,
builder.cy,
builder.cz,
builder.halfsize,
grid_depth,
);
let gx = (key.x as usize).min(g - 1);
let gy = (key.y as usize).min(g - 1);
let gz = (key.z as usize).min(g - 1);
let idx = gx + gy * g + gz * g * g;
grid[idx].fetch_add(1, Ordering::Relaxed);
}
let done = progress.fetch_add(n, Ordering::Relaxed) + n;
config.report(crate::ProgressEvent::StageProgress { done });
}
}
Ok(local_bounds)
})
.collect::<Result<Vec<_>>>()?;
let mut actual = ActualBounds::empty();
for b in &per_worker_bounds {
actual.merge(b);
}
Ok((grid, actual))
}
fn merge_sparse_cells(
grid: &[AtomicU32],
grid_size: u32,
grid_depth: u32,
chunk_target: u64,
) -> Vec<PlannedChunk> {
let g = grid_size as usize;
let mut current: Vec<i64> = grid
.iter()
.map(|c| c.load(Ordering::Relaxed) as i64)
.collect();
let mut current_size = g;
let mut chunks: Vec<PlannedChunk> = Vec::new();
for level in (1..=grid_depth).rev() {
let parent_size = current_size / 2;
let mut parent: Vec<i64> = vec![0; parent_size * parent_size * parent_size];
for pz in 0..parent_size {
for py in 0..parent_size {
for px in 0..parent_size {
let mut sum: i64 = 0;
let mut any_blocked = false;
let mut child_pops = [0i64; 8];
let mut child_idx = 0;
for dz in 0..2 {
for dy in 0..2 {
for dx in 0..2 {
let cx = px * 2 + dx;
let cy = py * 2 + dy;
let cz = pz * 2 + dz;
let idx = cx + cy * current_size + cz * current_size * current_size;
let pop = current[idx];
child_pops[child_idx] = pop;
child_idx += 1;
if pop == SENTINEL {
any_blocked = true;
} else {
sum += pop;
}
}
}
}
let parent_idx = px + py * parent_size + pz * parent_size * parent_size;
if any_blocked || sum as u64 > chunk_target {
let mut child_idx = 0;
for dz in 0..2 {
for dy in 0..2 {
for dx in 0..2 {
let pop = child_pops[child_idx];
child_idx += 1;
if pop > 0 {
chunks.push(PlannedChunk {
level,
gx: (px * 2 + dx) as i32,
gy: (py * 2 + dy) as i32,
gz: (pz * 2 + dz) as i32,
point_count: pop as u64,
});
}
}
}
}
parent[parent_idx] = SENTINEL;
} else if sum > 0 {
parent[parent_idx] = sum;
}
}
}
}
current = parent;
current_size = parent_size;
}
debug_assert_eq!(current.len(), 1);
let root_pop = current[0];
if root_pop > 0 {
chunks.push(PlannedChunk {
level: 0,
gx: 0,
gy: 0,
gz: 0,
point_count: root_pop as u64,
});
}
chunks
}
fn distribute_worker_count(input_file_count: usize) -> usize {
let cores = rayon::current_num_threads();
let target = ((cores * 2) / 3).max(2);
target.min(input_file_count).max(1)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn select_grid_thresholds() {
let big_budget = 16 * 1024 * 1024 * 1024;
assert_eq!(select_grid_size(1_000_000, big_budget), 128);
assert_eq!(select_grid_size(99_999_999, big_budget), 128);
assert_eq!(select_grid_size(100_000_000, big_budget), 256);
assert_eq!(select_grid_size(499_999_999, big_budget), 256);
assert_eq!(select_grid_size(500_000_000, big_budget), 512);
assert_eq!(select_grid_size(100_000_000_000, big_budget), 512);
}
#[test]
fn select_grid_size_falls_back_on_tiny_budget() {
let tiny = 512 * 1024 * 1024;
assert_eq!(select_grid_size(10_000_000_000, tiny), 128);
}
#[test]
fn select_grid_size_256_fits_mid_budget() {
let mid = 4 * 1024 * 1024 * 1024;
assert_eq!(select_grid_size(10_000_000_000, mid), 256);
}
#[test]
fn chunk_target_scales_with_budget() {
let target = compute_chunk_target(64 * 1024 * 1024 * 1024, 5, 42_800_000_000);
assert!((45_000_000..=47_000_000).contains(&target), "got {target}");
let target = compute_chunk_target(32 * 1024 * 1024 * 1024, 10, 42_800_000_000);
assert!((11_000_000..=13_000_000).contains(&target), "got {target}");
let target = compute_chunk_target(16 * 1024 * 1024 * 1024, 4, 42_800_000_000);
assert!((10_500_000..=11_800_000).contains(&target), "got {target}");
}
#[test]
fn chunk_target_dynamic_max_scales_with_budget() {
let target = compute_chunk_target(4 * 1024 * 1024 * 1024, 1, 10_000_000_000);
assert!(target <= 3_000_000, "got {target}, must be ≤ 3M");
assert!(target >= MIN_CHUNK_POINTS);
}
#[test]
fn chunk_target_parallelism_floor() {
let target = compute_chunk_target(64 * 1024 * 1024 * 1024, 5, 50_000_000);
assert!((2_000_000..=3_000_000).contains(&target));
}
#[test]
fn chunk_target_clamps_to_min() {
let target = compute_chunk_target(512 * 1024 * 1024, 32, 42_800_000_000);
assert_eq!(target, MIN_CHUNK_POINTS);
}
#[test]
fn merge_single_chunk_when_fits() {
let n = 8 * 8 * 8;
let grid: Box<[AtomicU32]> = (0..n).map(|_| AtomicU32::new(1)).collect();
let chunks = merge_sparse_cells(&grid, 8, 3, 1000);
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].level, 0);
assert_eq!(chunks[0].point_count, 512);
}
#[test]
fn merge_splits_when_target_exceeded() {
let n = 8 * 8 * 8;
let grid: Box<[AtomicU32]> = (0..n).map(|_| AtomicU32::new(100)).collect();
let chunks = merge_sparse_cells(&grid, 8, 3, 50);
assert_eq!(chunks.len(), 512);
assert!(chunks.iter().all(|c| c.level == 3 && c.point_count == 100));
}
#[test]
fn merge_partial_collapse() {
let n = 4 * 4 * 4;
let grid: Box<[AtomicU32]> = (0..n)
.map(|i| {
if i == 0 {
AtomicU32::new(10000)
} else {
AtomicU32::new(1)
}
})
.collect();
let chunks = merge_sparse_cells(&grid, 4, 2, 100);
let total_pts: u64 = chunks.iter().map(|c| c.point_count).sum();
assert_eq!(total_pts, 10000 + 63);
}
}