use crate::PipelineConfig;
use crate::copc_types::VoxelKey;
use crate::node_store::{FileNodeStore, NodeStore, PackedNodeStore};
use anyhow::{Context, Result};
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use rayon::prelude::*;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fs::{File, OpenOptions};
use std::io::{BufReader, BufWriter, Read, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use tracing::{debug, info};
const CHUNKED_OPEN_FILES_CAP: usize = 512;
const MIN_PER_WORKER_CHUNK_FILES: usize = 32;
struct ChunkWriterCache {
writers: HashMap<u32, BufWriter<File>>,
order: VecDeque<u32>,
capacity: usize,
codec: TempCompression,
}
impl ChunkWriterCache {
fn new(capacity: usize, codec: TempCompression) -> Self {
ChunkWriterCache {
writers: HashMap::new(),
order: VecDeque::new(),
capacity,
codec,
}
}
fn append(&mut self, chunk_idx: u32, shard_dir: &Path, points: &[RawPoint]) -> Result<()> {
if points.is_empty() {
return Ok(());
}
if self.writers.contains_key(&chunk_idx) {
if let Some(pos) = self.order.iter().position(|k| *k == chunk_idx) {
self.order.remove(pos);
}
self.order.push_back(chunk_idx);
} else {
while self.writers.len() >= self.capacity {
if let Some(victim) = self.order.pop_front() {
if let Some(mut w) = self.writers.remove(&victim) {
w.flush().context("flush evicted chunk writer")?;
}
} else {
break;
}
}
let path = chunk_shard_path(shard_dir, chunk_idx);
let f = OpenOptions::new()
.create(true)
.append(true)
.open(&path)
.with_context(|| format!("Cannot open chunk file {:?}", path))?;
self.writers.insert(chunk_idx, BufWriter::new(f));
self.order.push_back(chunk_idx);
}
let w = self.writers.get_mut(&chunk_idx).expect("just inserted");
write_temp_batch(w, points, self.codec)?;
Ok(())
}
fn flush_all(&mut self) -> Result<()> {
for (_, mut w) in self.writers.drain() {
w.flush().context("flush chunk writer")?;
}
self.order.clear();
Ok(())
}
}
fn chunk_shard_path(shard_dir: &Path, chunk_idx: u32) -> PathBuf {
shard_dir.join(format!("chunk_{chunk_idx}"))
}
fn chunk_canonical_path(chunks_dir: &Path, chunk_idx: u32) -> PathBuf {
chunks_dir.join(format!("chunk_{chunk_idx}"))
}
fn chunked_distribute_worker_count(input_file_count: usize) -> usize {
let cores = rayon::current_num_threads();
let target = ((cores * 2) / 3).max(2);
target.min(input_file_count).max(1)
}
fn chunk_local_extra_levels(point_count: u64) -> u32 {
if point_count <= MAX_LEAF_POINTS {
return 0;
}
let mut d = 0u32;
while (point_count as f64) / (8u64.pow(d) as f64) > MAX_LEAF_POINTS as f64 {
d += 1;
if d > 12 {
break;
}
}
d
}
fn build_chunk_lut(plan: &crate::chunking::ChunkPlan) -> Vec<u32> {
let g = plan.grid_size as usize;
let n_cells = g * g * g;
let mut lut = vec![u32::MAX; n_cells];
for (chunk_idx, chunk) in plan.chunks.iter().enumerate() {
let level_diff = plan.grid_depth as i32 - chunk.level as i32;
debug_assert!(
level_diff >= 0,
"chunk level {} above grid depth {}",
chunk.level,
plan.grid_depth
);
let stride: usize = 1usize << level_diff as u32;
let base_x = (chunk.gx as usize * stride).min(g);
let base_y = (chunk.gy as usize * stride).min(g);
let base_z = (chunk.gz as usize * stride).min(g);
let end_x = (base_x + stride).min(g);
let end_y = (base_y + stride).min(g);
let end_z = (base_z + stride).min(g);
for z in base_z..end_z {
for y in base_y..end_y {
let row_start = base_x + y * g + z * g * g;
let row_end = end_x + y * g + z * g * g;
for cell in &mut lut[row_start..row_end] {
*cell = chunk_idx as u32;
}
}
}
}
lut
}
fn merge_chunk_shards(shards_root: &Path, chunks_root: &Path, n_chunks: u32) -> Result<()> {
let num_workers = match std::fs::read_dir(shards_root) {
Ok(it) => it
.filter_map(|e| e.ok())
.filter(|e| e.path().is_dir())
.count(),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => 0,
Err(e) => return Err(e.into()),
};
debug!(
"Merging {} chunks across {} worker shards",
n_chunks, num_workers
);
(0..n_chunks)
.into_par_iter()
.try_for_each(|chunk_idx| -> Result<()> {
let canonical = chunk_canonical_path(chunks_root, chunk_idx);
let f = OpenOptions::new()
.create(true)
.append(true)
.open(&canonical)
.with_context(|| format!("opening canonical chunk file {:?}", canonical))?;
let mut out = BufWriter::new(f);
for w in 0..num_workers {
let shard_path = shards_root
.join(w.to_string())
.join(format!("chunk_{chunk_idx}"));
match File::open(&shard_path) {
Ok(f) => {
let mut reader = BufReader::new(f);
std::io::copy(&mut reader, &mut out).with_context(|| {
format!("copying shard {:?} into {:?}", shard_path, canonical)
})?;
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => continue,
Err(e) => return Err(e.into()),
}
}
out.flush().context("flush merged chunk file")?;
Ok(())
})?;
std::fs::remove_dir_all(shards_root)
.with_context(|| format!("removing shards root {:?}", shards_root))?;
Ok(())
}
type SampleTask = (VoxelKey, Vec<VoxelKey>, Vec<(usize, RawPoint)>);
type SampleResult = (VoxelKey, Vec<VoxelKey>, Vec<RawPoint>, Vec<Vec<RawPoint>>);
fn morton3(x: u32, y: u32, z: u32) -> u64 {
#[inline]
fn spread(mut v: u64) -> u64 {
v &= 0x1F_FFFF;
v = (v | (v << 32)) & 0x1F00000000FFFF;
v = (v | (v << 16)) & 0x1F0000FF0000FF;
v = (v | (v << 8)) & 0x100F00F00F00F00F;
v = (v | (v << 4)) & 0x10C30C30C30C30C3;
v = (v | (v << 2)) & 0x1249249249249249;
v
}
spread(x as u64) | (spread(y as u64) << 1) | (spread(z as u64) << 2)
}
const MAX_LEAF_POINTS: u64 = 100_000;
const GRID_CELLS_PER_AXIS: i64 = 128;
#[derive(Debug, Clone)]
pub struct RawPoint {
pub x: i32,
pub y: i32,
pub z: i32,
pub intensity: u16,
pub return_number: u8,
pub number_of_returns: u8,
pub classification: u8,
pub scan_angle: i16,
pub user_data: u8,
pub point_source_id: u16,
pub gps_time: f64,
pub red: u16,
pub green: u16,
pub blue: u16,
pub nir: u16,
}
impl RawPoint {
pub const BYTE_SIZE: usize = 4 + 4 + 4 + 2 + 1 + 1 + 1 + 2 + 1 + 2 + 8 + 2 + 2 + 2 + 2;
#[allow(unused)]
pub fn write<W: std::io::Write>(&self, w: &mut W) -> Result<()> {
let mut buf = [0u8; Self::BYTE_SIZE];
{
use std::io::Cursor;
let mut c = Cursor::new(&mut buf[..]);
c.write_i32::<LittleEndian>(self.x)?;
c.write_i32::<LittleEndian>(self.y)?;
c.write_i32::<LittleEndian>(self.z)?;
c.write_u16::<LittleEndian>(self.intensity)?;
c.write_u8(self.return_number)?;
c.write_u8(self.number_of_returns)?;
c.write_u8(self.classification)?;
c.write_i16::<LittleEndian>(self.scan_angle)?;
c.write_u8(self.user_data)?;
c.write_u16::<LittleEndian>(self.point_source_id)?;
c.write_f64::<LittleEndian>(self.gps_time)?;
c.write_u16::<LittleEndian>(self.red)?;
c.write_u16::<LittleEndian>(self.green)?;
c.write_u16::<LittleEndian>(self.blue)?;
c.write_u16::<LittleEndian>(self.nir)?;
}
w.write_all(&buf)?;
Ok(())
}
pub fn read<R: std::io::Read>(r: &mut R) -> Result<Self> {
let mut buf = [0u8; Self::BYTE_SIZE];
r.read_exact(&mut buf)?;
let mut c = std::io::Cursor::new(&buf[..]);
Ok(RawPoint {
x: c.read_i32::<LittleEndian>()?,
y: c.read_i32::<LittleEndian>()?,
z: c.read_i32::<LittleEndian>()?,
intensity: c.read_u16::<LittleEndian>()?,
return_number: c.read_u8()?,
number_of_returns: c.read_u8()?,
classification: c.read_u8()?,
scan_angle: c.read_i16::<LittleEndian>()?,
user_data: c.read_u8()?,
point_source_id: c.read_u16::<LittleEndian>()?,
gps_time: c.read_f64::<LittleEndian>()?,
red: c.read_u16::<LittleEndian>()?,
green: c.read_u16::<LittleEndian>()?,
blue: c.read_u16::<LittleEndian>()?,
nir: c.read_u16::<LittleEndian>()?,
})
}
pub fn write_bulk<W: std::io::Write>(points: &[RawPoint], w: &mut W) -> Result<()> {
let mut buf = vec![0u8; Self::BYTE_SIZE * points.len()];
let mut c = std::io::Cursor::new(&mut buf[..]);
for p in points {
c.write_i32::<LittleEndian>(p.x)?;
c.write_i32::<LittleEndian>(p.y)?;
c.write_i32::<LittleEndian>(p.z)?;
c.write_u16::<LittleEndian>(p.intensity)?;
c.write_u8(p.return_number)?;
c.write_u8(p.number_of_returns)?;
c.write_u8(p.classification)?;
c.write_i16::<LittleEndian>(p.scan_angle)?;
c.write_u8(p.user_data)?;
c.write_u16::<LittleEndian>(p.point_source_id)?;
c.write_f64::<LittleEndian>(p.gps_time)?;
c.write_u16::<LittleEndian>(p.red)?;
c.write_u16::<LittleEndian>(p.green)?;
c.write_u16::<LittleEndian>(p.blue)?;
c.write_u16::<LittleEndian>(p.nir)?;
}
w.write_all(&buf)?;
Ok(())
}
}
use crate::TempCompression;
struct MultiFrameReader<R: std::io::Read> {
inner: R,
}
impl<R: std::io::Read> std::io::Read for MultiFrameReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
match self.inner.read(buf)? {
0 => {
self.inner.read(buf)
}
n => Ok(n),
}
}
}
pub(crate) fn write_temp_batch<W: Write>(
w: &mut W,
points: &[RawPoint],
codec: TempCompression,
) -> Result<()> {
match codec {
TempCompression::None => {
w.write_u32::<LittleEndian>(points.len() as u32)?;
RawPoint::write_bulk(points, w)?;
}
TempCompression::Lz4 => {
let mut enc = lz4_flex::frame::FrameEncoder::new(w);
enc.write_u32::<LittleEndian>(points.len() as u32)?;
RawPoint::write_bulk(points, &mut enc)?;
enc.finish().context("finishing lz4 frame")?;
}
}
Ok(())
}
pub(crate) fn read_temp_batches<R: std::io::Read>(
r: R,
codec: TempCompression,
) -> Result<Vec<RawPoint>> {
match codec {
TempCompression::None => read_batches_loop(&mut BufReader::new(r)),
TempCompression::Lz4 => {
let dec = lz4_flex::frame::FrameDecoder::new(BufReader::new(r));
let mut mf = MultiFrameReader { inner: dec };
read_batches_loop(&mut mf)
}
}
}
fn read_batches_loop<R: std::io::Read>(r: &mut R) -> Result<Vec<RawPoint>> {
let mut out: Vec<RawPoint> = Vec::new();
for_each_point_in_batches(r, |p| {
out.push(p);
Ok(())
})?;
Ok(out)
}
fn for_each_point_in_batches<R: std::io::Read, F: FnMut(RawPoint) -> Result<()>>(
r: &mut R,
mut f: F,
) -> Result<()> {
loop {
let count = match r.read_u32::<LittleEndian>() {
Ok(n) => n as usize,
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
};
for _ in 0..count {
f(RawPoint::read(r)?)?;
}
}
Ok(())
}
fn stream_temp_batches<R: std::io::Read, F: FnMut(RawPoint) -> Result<()>>(
r: R,
codec: TempCompression,
f: F,
) -> Result<()> {
match codec {
TempCompression::None => for_each_point_in_batches(&mut BufReader::new(r), f),
TempCompression::Lz4 => {
let dec = lz4_flex::frame::FrameDecoder::new(BufReader::new(r));
let mut mf = MultiFrameReader { inner: dec };
for_each_point_in_batches(&mut mf, f)
}
}
}
pub(crate) fn count_temp_file_points(path: &Path, codec: TempCompression) -> Result<u64> {
let f = match File::open(path) {
Ok(f) => f,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(0),
Err(e) => return Err(e.into()),
};
match codec {
TempCompression::None => {
use std::io::{Seek, SeekFrom};
let mut f = f;
let mut total: u64 = 0;
loop {
let count = match f.read_u32::<LittleEndian>() {
Ok(n) => n as u64,
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
};
total += count;
let payload = count * RawPoint::BYTE_SIZE as u64;
f.seek(SeekFrom::Current(payload as i64))?;
}
Ok(total)
}
TempCompression::Lz4 => {
let dec = lz4_flex::frame::FrameDecoder::new(BufReader::new(f));
let mut mf = MultiFrameReader { inner: dec };
let mut total: u64 = 0;
let mut sink = [0u8; 4096];
loop {
let count = match mf.read_u32::<LittleEndian>() {
Ok(n) => n as u64,
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
};
total += count;
let mut remaining = count * RawPoint::BYTE_SIZE as u64;
while remaining > 0 {
let want = remaining.min(sink.len() as u64) as usize;
mf.read_exact(&mut sink[..want])?;
remaining -= want as u64;
}
}
Ok(total)
}
}
}
#[derive(Debug, Clone)]
pub struct Bounds {
pub min_x: f64,
pub min_y: f64,
pub min_z: f64,
pub max_x: f64,
pub max_y: f64,
pub max_z: f64,
}
impl Bounds {
pub fn empty() -> Self {
Bounds {
min_x: f64::MAX,
min_y: f64::MAX,
min_z: f64::MAX,
max_x: f64::MIN,
max_y: f64::MIN,
max_z: f64::MIN,
}
}
pub fn expand_with(&mut self, x: f64, y: f64, z: f64) {
if x < self.min_x {
self.min_x = x;
}
if y < self.min_y {
self.min_y = y;
}
if z < self.min_z {
self.min_z = z;
}
if x > self.max_x {
self.max_x = x;
}
if y > self.max_y {
self.max_y = y;
}
if z > self.max_z {
self.max_z = z;
}
}
pub fn merge(&mut self, other: &Bounds) {
if other.min_x < self.min_x {
self.min_x = other.min_x;
}
if other.min_y < self.min_y {
self.min_y = other.min_y;
}
if other.min_z < self.min_z {
self.min_z = other.min_z;
}
if other.max_x > self.max_x {
self.max_x = other.max_x;
}
if other.max_y > self.max_y {
self.max_y = other.max_y;
}
if other.max_z > self.max_z {
self.max_z = other.max_z;
}
}
pub fn to_cube(&self, scale_x: f64, scale_y: f64, scale_z: f64) -> (f64, f64, f64, f64) {
let cx = (self.min_x + self.max_x) / 2.0;
let cy = (self.min_y + self.max_y) / 2.0;
let cz = (self.min_z + self.max_z) / 2.0;
let scale_pad = scale_x.max(scale_y).max(scale_z);
let half = ((self.max_x - self.min_x)
.max(self.max_y - self.min_y)
.max(self.max_z - self.min_z))
/ 2.0
+ scale_pad;
let half = half * 1.0001; (cx, cy, cz, half)
}
}
#[allow(clippy::too_many_arguments)]
pub fn point_to_key(
x: f64,
y: f64,
z: f64,
cx: f64,
cy: f64,
cz: f64,
halfsize: f64,
depth: u32,
) -> VoxelKey {
let mut vx = 0i32;
let mut vy = 0i32;
let mut vz = 0i32;
let mut half = halfsize;
let mut ox = cx;
let mut oy = cy;
let mut oz = cz;
for _ in 0..depth {
half /= 2.0;
let bx = if x >= ox {
vx = vx * 2 + 1;
ox + half
} else {
vx *= 2;
ox - half
};
let by = if y >= oy {
vy = vy * 2 + 1;
oy + half
} else {
vy *= 2;
oy - half
};
let bz = if z >= oz {
vz = vz * 2 + 1;
oz + half
} else {
vz *= 2;
oz - half
};
ox = bx;
oy = by;
oz = bz;
}
VoxelKey {
level: depth as i32,
x: vx,
y: vy,
z: vz,
}
}
pub fn input_to_copc_format(id: u8) -> u8 {
match id {
2 | 3 | 5 | 7 => 7,
8 | 10 => 8,
_ => 6, }
}
pub struct ScanResult {
pub bounds: Bounds,
pub point_count: u64,
pub scale_x: f64,
pub scale_y: f64,
pub scale_z: f64,
pub offset_x: f64,
pub offset_y: f64,
pub offset_z: f64,
pub wkt_crs_hash: Option<u64>,
pub point_format_id: u8,
}
pub struct ScanOutput {
pub results: Vec<ScanResult>,
pub canonical_wkt: Option<Vec<u8>>,
}
fn wkt_hash(bytes: &[u8]) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{BuildHasher, BuildHasherDefault, Hasher};
let mut h = BuildHasherDefault::<DefaultHasher>::default().build_hasher();
h.write(bytes);
h.finish()
}
pub struct OctreeBuilder {
pub bounds: Bounds,
pub total_points: u64,
pub cx: f64,
pub cy: f64,
pub cz: f64,
pub halfsize: f64,
pub scale_x: f64,
pub scale_y: f64,
pub scale_z: f64,
pub offset_x: f64,
pub offset_y: f64,
pub offset_z: f64,
pub tmp_dir: PathBuf,
pub wkt_crs: Option<Vec<u8>>,
pub point_format: u8,
pub(crate) chunked_plan: Option<crate::chunking::ChunkPlan>,
pub(crate) temp_compression: crate::TempCompression,
pub(crate) node_store: Arc<dyn NodeStore>,
}
impl OctreeBuilder {
pub fn scan(input_files: &[PathBuf], config: &PipelineConfig) -> Result<ScanOutput> {
let done = std::sync::atomic::AtomicU64::new(0);
let per_file: Result<Vec<(ScanResult, Option<Vec<u8>>)>> = input_files
.par_iter()
.map(|path| -> Result<(ScanResult, Option<Vec<u8>>)> {
debug!("Scanning {:?}", path);
let reader = las::Reader::from_path(path)
.with_context(|| format!("Cannot open {:?}", path))?;
let hdr = reader.header();
let b = hdr.bounds();
let mut bounds = Bounds::empty();
bounds.expand_with(b.min.x, b.min.y, b.min.z);
bounds.expand_with(b.max.x, b.max.y, b.max.z);
let t = hdr.transforms();
let n = done.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1;
config.report(crate::ProgressEvent::StageProgress { done: n });
let wkt_bytes: Option<Vec<u8>> = hdr
.all_vlrs()
.find(|v| v.is_wkt_crs())
.map(|v| v.data.clone());
let wkt_crs_hash = wkt_bytes.as_deref().map(wkt_hash);
Ok((
ScanResult {
bounds,
point_count: hdr.number_of_points(),
scale_x: t.x.scale,
scale_y: t.y.scale,
scale_z: t.z.scale,
offset_x: t.x.offset,
offset_y: t.y.offset,
offset_z: t.z.offset,
wkt_crs_hash,
point_format_id: hdr.point_format().to_u8().unwrap_or(0),
},
wkt_bytes,
))
})
.collect();
let per_file = per_file?;
let mut canonical_wkt: Option<Vec<u8>> = None;
let mut results: Vec<ScanResult> = Vec::with_capacity(per_file.len());
for (sr, wkt) in per_file {
if canonical_wkt.is_none()
&& let Some(bytes) = wkt
{
canonical_wkt = Some(bytes);
}
results.push(sr);
}
Ok(ScanOutput {
results,
canonical_wkt,
})
}
pub fn from_scan(
scan_results: &[ScanResult],
validated: &crate::validate::ValidatedInputs,
config: &PipelineConfig,
) -> Result<Self> {
let mut bounds = Bounds::empty();
let mut total_points = 0u64;
for r in scan_results {
bounds.merge(&r.bounds);
total_points += r.point_count;
}
let first = &scan_results[0];
let (scale_x, scale_y, scale_z) = (first.scale_x, first.scale_y, first.scale_z);
let (offset_x, offset_y, offset_z) = (first.offset_x, first.offset_y, first.offset_z);
let (cx, cy, cz, halfsize) = bounds.to_cube(scale_x, scale_y, scale_z);
let depth = {
let mut d = 0u32;
while (total_points as f64) / (8u64.pow(d) as f64) > MAX_LEAF_POINTS as f64 {
d += 1;
if d > 16 {
break;
}
}
d.max(1)
};
debug!("Octree depth = {depth}, total points = {total_points}");
let sys_tmp = std::env::temp_dir();
let base_tmp = config.temp_dir.as_deref().unwrap_or(&sys_tmp);
let tmp_dir = base_tmp.join(format!("copc_{}", std::process::id()));
if tmp_dir.exists() {
info!("Removing stale temp dir {:?}", tmp_dir);
std::fs::remove_dir_all(&tmp_dir)?;
}
std::fs::create_dir_all(&tmp_dir)?;
let node_store: Arc<dyn NodeStore> = match config.node_storage {
crate::NodeStorage::Files => {
Arc::new(FileNodeStore::new(tmp_dir.clone(), config.temp_compression))
}
crate::NodeStorage::Packed => Arc::new(PackedNodeStore::new(
&tmp_dir,
config.temp_compression,
rayon::current_num_threads().max(1),
)?),
};
Ok(OctreeBuilder {
bounds,
total_points,
cx,
cy,
cz,
halfsize,
scale_x,
scale_y,
scale_z,
offset_x,
offset_y,
offset_z,
tmp_dir,
wkt_crs: validated.wkt_crs.clone(),
point_format: validated.point_format,
chunked_plan: None,
temp_compression: config.temp_compression,
node_store,
})
}
pub(crate) fn count_node(&self, key: &VoxelKey) -> Result<u64> {
self.node_store.count(key)
}
fn convert_point(&self, p: &las::Point) -> RawPoint {
let ix = ((p.x - self.offset_x) / self.scale_x).round() as i32;
let iy = ((p.y - self.offset_y) / self.scale_y).round() as i32;
let iz = ((p.z - self.offset_z) / self.scale_z).round() as i32;
RawPoint {
x: ix,
y: iy,
z: iz,
intensity: p.intensity,
return_number: p.return_number,
number_of_returns: p.number_of_returns,
classification: p.classification.into(),
scan_angle: (p.scan_angle / 0.006).round() as i16,
user_data: p.user_data,
point_source_id: p.point_source_id,
gps_time: p.gps_time.unwrap_or(0.0),
red: p.color.as_ref().map(|c| c.red).unwrap_or(0),
green: p.color.as_ref().map(|c| c.green).unwrap_or(0),
blue: p.color.as_ref().map(|c| c.blue).unwrap_or(0),
nir: p.nir.unwrap_or(0),
}
}
pub fn read_node(&self, key: &VoxelKey) -> Result<Vec<RawPoint>> {
self.node_store.read(key)
}
pub fn write_node_to_temp(&self, key: &VoxelKey, points: &[RawPoint]) -> Result<()> {
self.node_store.write(key, points)
}
fn bottom_up_levels(
&self,
mut nodes: HashMap<VoxelKey, Vec<RawPoint>>,
actual_max_depth: u32,
min_level: u32,
report_progress: bool,
config: &crate::PipelineConfig,
) -> Result<Vec<(VoxelKey, usize)>> {
for d in (min_level..actual_max_depth).rev() {
if report_progress {
config.report(crate::ProgressEvent::StageProgress {
done: (actual_max_depth - d) as u64,
});
}
let level_points: usize = nodes
.iter()
.filter(|(k, _)| k.level as u32 == d + 1)
.map(|(_, v)| v.len())
.sum();
debug!(
"In-memory level {d}: {} total points at child level, {} nodes in map ({} MB est)",
level_points,
nodes.len(),
(nodes.values().map(|v| v.len()).sum::<usize>() * std::mem::size_of::<RawPoint>())
/ 1_048_576,
);
let mut parent_children: HashMap<VoxelKey, Vec<VoxelKey>> = HashMap::new();
for k in nodes.keys() {
if k.level as u32 == d + 1
&& let Some(p) = k.parent()
{
parent_children.entry(p).or_default().push(*k);
}
}
if parent_children.is_empty() {
continue;
}
let tasks: Vec<SampleTask> = parent_children
.into_iter()
.map(|(parent, children)| {
let all_pts: Vec<(usize, RawPoint)> = children
.iter()
.enumerate()
.flat_map(|(ci, ck)| {
nodes
.remove(ck)
.unwrap_or_default()
.into_iter()
.map(move |p| (ci, p))
})
.collect();
(parent, children, all_pts)
})
.collect();
let results: Vec<SampleResult> = tasks
.into_par_iter()
.map(|(parent, children, all_pts)| -> Result<_> {
if all_pts.is_empty() {
let n = children.len();
return Ok((parent, children, vec![], vec![vec![]; n]));
}
let n = children.len();
let (parent_pts, remaining) = self.grid_sample(&parent, all_pts, n);
Ok((parent, children, parent_pts, remaining))
})
.collect::<Result<_>>()?;
for (parent, children, parent_pts, remaining) in results {
for (ck, rem) in children.into_iter().zip(remaining) {
if !rem.is_empty() {
nodes.insert(ck, rem);
}
}
if !parent_pts.is_empty() {
nodes.insert(parent, parent_pts);
}
}
}
nodes
.par_iter()
.map(|(k, pts)| -> Result<()> { self.write_node_to_temp(k, pts) })
.collect::<Result<Vec<_>>>()?;
Ok(nodes
.iter()
.filter(|(_, pts)| !pts.is_empty())
.map(|(k, pts)| (*k, pts.len()))
.collect())
}
fn grid_sample(
&self,
parent: &VoxelKey,
mut pts: Vec<(usize, RawPoint)>, n_children: usize,
) -> (Vec<RawPoint>, Vec<Vec<RawPoint>>) {
if pts.is_empty() {
return (vec![], vec![vec![]; n_children]);
}
let voxel_size_world = 2.0 * self.halfsize / (1u64 << parent.level) as f64;
let origin_x = ((self.cx - self.halfsize + parent.x as f64 * voxel_size_world
- self.offset_x)
/ self.scale_x)
.round() as i64;
let origin_y = ((self.cy - self.halfsize + parent.y as f64 * voxel_size_world
- self.offset_y)
/ self.scale_y)
.round() as i64;
let origin_z = ((self.cz - self.halfsize + parent.z as f64 * voxel_size_world
- self.offset_z)
/ self.scale_z)
.round() as i64;
let int_size =
(voxel_size_world / self.scale_x.min(self.scale_y).min(self.scale_z)).round() as i64;
let cell = (int_size / GRID_CELLS_PER_AXIS).max(1);
pts.sort_unstable_by_key(|(_, p)| {
let dx = (p.x as i64 - origin_x).max(0) as u32;
let dy = (p.y as i64 - origin_y).max(0) as u32;
let dz = (p.z as i64 - origin_z).max(0) as u32;
morton3(dx, dy, dz)
});
let grid_key = |p: &RawPoint| -> (i32, i32, i32) {
(
((p.x as i64 - origin_x) / cell) as i32,
((p.y as i64 - origin_y) / cell) as i32,
((p.z as i64 - origin_z) / cell) as i32,
)
};
let mut child_has_pts = vec![false; n_children];
for (ci, _) in &pts {
child_has_pts[*ci] = true;
}
let mut occupied: HashSet<(i32, i32, i32)> = HashSet::new();
let max_accepted =
(GRID_CELLS_PER_AXIS * GRID_CELLS_PER_AXIS * GRID_CELLS_PER_AXIS) as usize;
let mut parent_pts: Vec<(usize, RawPoint)> = Vec::with_capacity(max_accepted);
let mut remaining: Vec<Vec<RawPoint>> = vec![Vec::new(); n_children];
for (ci, p) in pts {
if parent_pts.len() < max_accepted && occupied.insert(grid_key(&p)) {
parent_pts.push((ci, p));
} else {
remaining[ci].push(p);
}
}
for ci in 0..n_children {
if child_has_pts[ci]
&& remaining[ci].is_empty()
&& let Some(pos) = parent_pts.iter().rposition(|(c, _)| *c == ci)
{
let (_, p) = parent_pts.remove(pos);
remaining[ci].push(p);
}
}
let parent_pts = parent_pts.into_iter().map(|(_, p)| p).collect();
(parent_pts, remaining)
}
pub fn distribute(&mut self, input_files: &[PathBuf], config: &PipelineConfig) -> Result<()> {
config.report(crate::ProgressEvent::StageStart {
name: "Counting",
total: self.total_points,
});
let plan = crate::chunking::compute_chunk_plan(
self,
input_files,
config,
config.chunk_target_override,
)?;
config.report(crate::ProgressEvent::StageDone);
info!(
"Distribute: {} chunks, target {} points each, grid {}³",
plan.chunks.len(),
plan.chunk_target,
plan.grid_size
);
let lut = build_chunk_lut(&plan);
let chunks_root = self.tmp_dir.join("chunks");
if chunks_root.exists() {
std::fs::remove_dir_all(&chunks_root)
.with_context(|| format!("removing stale chunks dir {:?}", chunks_root))?;
}
std::fs::create_dir_all(&chunks_root)?;
let shards_root = chunks_root.join("shards");
std::fs::create_dir_all(&shards_root)?;
let num_workers = chunked_distribute_worker_count(input_files.len());
let shard_dirs: Vec<PathBuf> = (0..num_workers)
.map(|w| {
let d = shards_root.join(w.to_string());
std::fs::create_dir_all(&d)?;
Ok(d)
})
.collect::<Result<Vec<_>>>()?;
let per_worker_cap = (CHUNKED_OPEN_FILES_CAP / num_workers).max(MIN_PER_WORKER_CHUNK_FILES);
const BYTES_PER_POINT_TRANSIENT: u64 = 120;
let per_worker_budget = config.memory_budget / num_workers as u64;
let read_chunk_size =
((per_worker_budget / 8) / BYTES_PER_POINT_TRANSIENT).clamp(50_000, 2_000_000) as usize;
debug!(
"Distribute: budget={} MB, workers={}, read_chunk_size={}, open-file cap={} (per worker)",
config.memory_budget / 1_048_576,
num_workers,
read_chunk_size,
per_worker_cap
);
let g = plan.grid_size;
let grid_depth = plan.grid_depth;
let g_usize = g as usize;
let files_per_worker = input_files.len().div_ceil(num_workers);
let progress = AtomicU64::new(0);
config.report(crate::ProgressEvent::StageStart {
name: "Distributing",
total: self.total_points,
});
shard_dirs
.par_iter()
.enumerate()
.try_for_each(|(worker_id, shard_dir)| -> Result<()> {
let start = worker_id * files_per_worker;
let end = (start + files_per_worker).min(input_files.len());
if start >= end {
return Ok(());
}
let mut cache = ChunkWriterCache::new(per_worker_cap, self.temp_compression);
let mut points: Vec<las::Point> = Vec::with_capacity(read_chunk_size);
let mut groups: HashMap<u32, Vec<RawPoint>> = HashMap::new();
for path in &input_files[start..end] {
let mut reader = las::Reader::from_path(path)
.with_context(|| format!("Cannot open {:?}", path))?;
let file_pts = reader.header().number_of_points();
debug!(
"Distribute worker {} file {:?}: {} points",
worker_id,
path.file_name().unwrap_or_default(),
file_pts
);
loop {
points.clear();
let n = reader.read_points_into(read_chunk_size as u64, &mut points)?;
if n == 0 {
break;
}
for p in &points {
let raw = self.convert_point(p);
let wx = raw.x as f64 * self.scale_x + self.offset_x;
let wy = raw.y as f64 * self.scale_y + self.offset_y;
let wz = raw.z as f64 * self.scale_z + self.offset_z;
let key = point_to_key(
wx,
wy,
wz,
self.cx,
self.cy,
self.cz,
self.halfsize,
grid_depth,
);
let gx = (key.x as usize).min(g_usize - 1);
let gy = (key.y as usize).min(g_usize - 1);
let gz = (key.z as usize).min(g_usize - 1);
let cell_idx = gx + gy * g_usize + gz * g_usize * g_usize;
let chunk_idx = lut[cell_idx];
groups.entry(chunk_idx).or_default().push(raw);
}
points.clear();
for (chunk_idx, pts) in groups.drain() {
cache.append(chunk_idx, shard_dir, &pts)?;
}
let done = progress.fetch_add(n, Ordering::Relaxed) + n;
config.report(crate::ProgressEvent::StageProgress { done });
}
}
cache.flush_all()
})?;
merge_chunk_shards(&shards_root, &chunks_root, plan.chunks.len() as u32)?;
config.report(crate::ProgressEvent::StageDone);
self.chunked_plan = Some(plan);
Ok(())
}
fn stream_chunk_file<F: FnMut(RawPoint) -> Result<()>>(
&self,
chunk_idx: u32,
f: F,
) -> Result<()> {
let path = chunk_canonical_path(&self.tmp_dir.join("chunks"), chunk_idx);
let file = File::open(&path).with_context(|| format!("opening chunk file {:?}", path))?;
stream_temp_batches(file, self.temp_compression, f)
}
fn build_chunk_in_memory(
&self,
chunk: &crate::chunking::PlannedChunk,
chunk_idx: u32,
config: &crate::PipelineConfig,
) -> Result<Vec<(VoxelKey, usize)>> {
let extra_levels = chunk_local_extra_levels(chunk.point_count);
let leaf_depth = chunk.level + extra_levels;
debug!(
"Chunk {} (L{} {},{},{}): ~{} points → leaf depth {} (extra {})",
chunk_idx,
chunk.level,
chunk.gx,
chunk.gy,
chunk.gz,
chunk.point_count,
leaf_depth,
extra_levels
);
let mut leaves: HashMap<VoxelKey, Vec<RawPoint>> = HashMap::new();
let mut n_points: usize = 0;
self.stream_chunk_file(chunk_idx, |raw| {
n_points += 1;
let wx = raw.x as f64 * self.scale_x + self.offset_x;
let wy = raw.y as f64 * self.scale_y + self.offset_y;
let wz = raw.z as f64 * self.scale_z + self.offset_z;
let key = point_to_key(
wx,
wy,
wz,
self.cx,
self.cy,
self.cz,
self.halfsize,
leaf_depth,
);
#[cfg(debug_assertions)]
{
let mut ancestor = key;
while ancestor.level > chunk.level as i32 {
ancestor = ancestor.parent().expect("leaf above chunk level");
}
debug_assert!(
ancestor.level == chunk.level as i32
&& ancestor.x == chunk.gx
&& ancestor.y == chunk.gy
&& ancestor.z == chunk.gz,
"leaf {:?} does not belong to chunk root (L{} {},{},{}): ancestor at L{} = ({},{},{})",
key,
chunk.level,
chunk.gx,
chunk.gy,
chunk.gz,
ancestor.level,
ancestor.x,
ancestor.y,
ancestor.z,
);
}
leaves.entry(key).or_default().push(raw);
Ok(())
})?;
if n_points == 0 {
debug!(
"Chunk {} (L{} {},{},{}) is empty, skipping",
chunk_idx, chunk.level, chunk.gx, chunk.gy, chunk.gz
);
return Ok(Vec::new());
}
const CHUNK_DEPTH_CAP: u32 = 20; let mut effective_max_depth = leaf_depth;
loop {
let oversized: Vec<VoxelKey> = leaves
.iter()
.filter(|(_, pts)| pts.len() as u64 > MAX_LEAF_POINTS)
.map(|(k, _)| *k)
.collect();
if oversized.is_empty() {
break;
}
let hit_cap = oversized.iter().any(|k| k.level as u32 >= CHUNK_DEPTH_CAP);
if hit_cap {
debug!(
"Chunk {}: stopping subdivision at depth cap {} \
({} leaves still oversized)",
chunk_idx,
CHUNK_DEPTH_CAP,
oversized.len()
);
break;
}
for key in oversized {
let pts = leaves.remove(&key).expect("just listed");
let new_level = (key.level as u32) + 1;
if new_level > effective_max_depth {
effective_max_depth = new_level;
}
for raw in pts {
let wx = raw.x as f64 * self.scale_x + self.offset_x;
let wy = raw.y as f64 * self.scale_y + self.offset_y;
let wz = raw.z as f64 * self.scale_z + self.offset_z;
let new_key = point_to_key(
wx,
wy,
wz,
self.cx,
self.cy,
self.cz,
self.halfsize,
new_level,
);
leaves.entry(new_key).or_default().push(raw);
}
}
}
debug!(
"Chunk {}: {} leaves after subdivision, effective depth {}",
chunk_idx,
leaves.len(),
effective_max_depth
);
self.bottom_up_levels(leaves, effective_max_depth, chunk.level, false, config)
}
fn merge_chunk_tops(
&self,
chunk_root_keys: &[VoxelKey],
config: &crate::PipelineConfig,
) -> Result<Vec<VoxelKey>> {
if chunk_root_keys.is_empty() {
return Ok(Vec::new());
}
let mut keys_by_level: HashMap<i32, HashSet<VoxelKey>> = HashMap::new();
for k in chunk_root_keys {
keys_by_level.entry(k.level).or_default().insert(*k);
}
let max_chunk_level: i32 = keys_by_level.keys().copied().max().unwrap_or(0);
debug!(
"Merge chunk tops: {} chunk roots, max_chunk_level={}",
chunk_root_keys.len(),
max_chunk_level
);
const MEM_PER_POINT: u64 =
(std::mem::size_of::<(usize, RawPoint)>() + std::mem::size_of::<RawPoint>()) as u64;
let mut all_new_parents: Vec<VoxelKey> = Vec::new();
for d in (0..max_chunk_level).rev() {
let child_level = d + 1;
let child_keys: Vec<VoxelKey> = match keys_by_level.get(&child_level) {
Some(v) => v.iter().copied().collect(),
None => continue,
};
if child_keys.is_empty() {
continue;
}
let mut parent_children: HashMap<VoxelKey, Vec<VoxelKey>> = HashMap::new();
for ck in &child_keys {
if let Some(parent) = ck.parent() {
parent_children.entry(parent).or_default().push(*ck);
}
}
if parent_children.is_empty() {
continue;
}
let mut small_parents: Vec<(VoxelKey, Vec<VoxelKey>, u64)> = Vec::new();
let mut large_parents: Vec<(VoxelKey, Vec<VoxelKey>, u64)> = Vec::new();
for (parent, children) in parent_children {
let est_points: u64 = children
.iter()
.map(|ck| self.count_node(ck).unwrap_or(0))
.sum();
let est_mem = est_points * MEM_PER_POINT;
if est_mem > config.memory_budget {
large_parents.push((parent, children, est_mem));
} else {
small_parents.push((parent, children, est_mem));
}
}
debug!(
"Merge level {}→{}: {} parents ({} small, {} large), budget={} MB",
child_level,
d,
small_parents.len() + large_parents.len(),
small_parents.len(),
large_parents.len(),
config.memory_budget / 1_048_576,
);
if let Some((parent, children, est_mem)) = large_parents.first() {
return Err(anyhow::anyhow!(
"merge parent {:?} has {} children with combined estimate {} MB, \
exceeding memory budget {} MB. Raise --memory-limit or investigate \
the chunk plan.",
parent,
children.len(),
est_mem / 1_048_576,
config.memory_budget / 1_048_576,
));
}
small_parents.sort_by_key(|p| std::cmp::Reverse(p.2));
let mut batch_start = 0;
while batch_start < small_parents.len() {
let mut batch_mem: u64 = 0;
let mut batch_end = batch_start;
while batch_end < small_parents.len() {
if batch_end > batch_start
&& batch_mem + small_parents[batch_end].2 > config.memory_budget
{
break;
}
batch_mem += small_parents[batch_end].2;
batch_end += 1;
}
let batch = &small_parents[batch_start..batch_end];
debug!(
" Merge batch: {} parents, est {} MB",
batch.len(),
batch_mem / 1_048_576,
);
batch
.par_iter()
.map(|(parent, children, _)| -> Result<()> {
let mut all_pts: Vec<(usize, RawPoint)> = Vec::new();
for (ci, ck) in children.iter().enumerate() {
for p in self.read_node(ck)? {
all_pts.push((ci, p));
}
}
if all_pts.is_empty() {
return Ok(());
}
let (parent_pts, per_child) =
self.grid_sample(parent, all_pts, children.len());
for (ci, ck) in children.iter().enumerate() {
self.write_node_to_temp(ck, &per_child[ci])?;
}
if !parent_pts.is_empty() {
self.write_node_to_temp(parent, &parent_pts)?;
}
Ok(())
})
.collect::<Result<Vec<_>>>()?;
for (parent, _, _) in &small_parents[batch_start..batch_end] {
all_new_parents.push(*parent);
keys_by_level.entry(d).or_default().insert(*parent);
}
batch_start = batch_end;
}
config.report(crate::ProgressEvent::StageProgress {
done: (max_chunk_level - d) as u64,
});
}
Ok(all_new_parents)
}
pub fn build_node_map(&self, config: &crate::PipelineConfig) -> Result<Vec<(VoxelKey, usize)>> {
let plan = self
.chunked_plan
.as_ref()
.ok_or_else(|| anyhow::anyhow!("build_node_map called without a chunk plan"))?;
let n_chunks = plan.chunks.len();
if n_chunks == 0 {
info!("Build: no chunks (empty input?)");
return Ok(Vec::new());
}
info!(
"Build: {} chunks, max_chunk_level={}",
n_chunks,
plan.chunks.iter().map(|c| c.level).max().unwrap_or(0)
);
const PER_CHUNK_BYTES_PER_POINT: u64 = 600;
let mut chunks_indexed: Vec<(u32, &crate::chunking::PlannedChunk)> = plan
.chunks
.iter()
.enumerate()
.map(|(i, c)| (i as u32, c))
.collect();
chunks_indexed.sort_by_key(|c| std::cmp::Reverse(c.1.point_count));
config.report(crate::ProgressEvent::StageStart {
name: "Building",
total: n_chunks as u64,
});
let chunks_done = AtomicU64::new(0);
let mut all_chunk_node_keys: Vec<VoxelKey> = Vec::new();
let mut chunk_root_keys: Vec<VoxelKey> = Vec::new();
let mut batch_start = 0;
while batch_start < chunks_indexed.len() {
let mut batch_mem: u64 = 0;
let mut batch_end = batch_start;
while batch_end < chunks_indexed.len() {
let est_mem = chunks_indexed[batch_end].1.point_count * PER_CHUNK_BYTES_PER_POINT;
if batch_end > batch_start && batch_mem + est_mem > config.memory_budget {
break;
}
batch_mem += est_mem;
batch_end += 1;
}
let batch = &chunks_indexed[batch_start..batch_end];
debug!(
"Build batch: {} chunks, est {} MB",
batch.len(),
batch_mem / 1_048_576
);
let batch_results: Vec<Vec<(VoxelKey, usize)>> = batch
.par_iter()
.map(|(chunk_idx, chunk)| -> Result<Vec<(VoxelKey, usize)>> {
let nodes = self.build_chunk_in_memory(chunk, *chunk_idx, config)?;
let done = chunks_done.fetch_add(1, Ordering::Relaxed) + 1;
config.report(crate::ProgressEvent::StageProgress { done });
Ok(nodes)
})
.collect::<Result<_>>()?;
for ((_chunk_idx, chunk), nodes) in batch.iter().zip(batch_results) {
let chunk_level_i32 = chunk.level as i32;
for (k, _) in &nodes {
all_chunk_node_keys.push(*k);
if k.level == chunk_level_i32 {
chunk_root_keys.push(*k);
}
}
}
batch_start = batch_end;
}
debug!(
"Per-chunk build done: {} chunk nodes, {} chunk roots",
all_chunk_node_keys.len(),
chunk_root_keys.len()
);
let merge_parents = self.merge_chunk_tops(&chunk_root_keys, config)?;
debug!("Merge step produced {} parent nodes", merge_parents.len());
config.report(crate::ProgressEvent::StageDone);
let mut all_keys: Vec<VoxelKey> = all_chunk_node_keys;
all_keys.extend(merge_parents);
all_keys.sort_unstable_by_key(|k| (k.level, k.x, k.y, k.z));
all_keys.dedup();
let mut result: Vec<(VoxelKey, usize)> = all_keys
.par_iter()
.map(|k| -> Result<(VoxelKey, usize)> {
let n = self.count_node(k)? as usize;
Ok((*k, n))
})
.collect::<Result<Vec<_>>>()?;
result.retain(|(_, count)| *count > 0);
let total_pts: usize = result.iter().map(|(_, c)| *c).sum();
info!(
"Build: {} nodes, {} total points (input: {})",
result.len(),
total_pts,
self.total_points
);
if total_pts as u64 != self.total_points {
debug!(
"COPC contains {} points vs {} from input headers (diff {}). \
Input LAZ headers sometimes report inaccurate point counts.",
total_pts,
self.total_points,
self.total_points as i64 - total_pts as i64
);
}
let mut present: HashSet<VoxelKey> = result.iter().map(|(k, _)| *k).collect();
let mut extra: Vec<VoxelKey> = Vec::new();
for (key, _) in &result {
let mut k = *key;
while let Some(parent) = k.parent() {
if present.insert(parent) {
extra.push(parent);
}
k = parent;
}
}
for k in extra {
result.push((k, 0));
}
result.sort_by_key(|(k, _)| k.level);
Ok(result)
}
}
impl Drop for OctreeBuilder {
fn drop(&mut self) {
let _ = std::fs::remove_dir_all(&self.tmp_dir);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn to_cube_pads_halfsize_by_one_scale_unit() {
let mut b = Bounds::empty();
b.expand_with(184000.0, 426417.72, -13.11);
b.expand_with(184100.0, 426500.0, 44.23);
let scale = 0.01;
let (_, _, _cz, halfsize) = b.to_cube(scale, scale, scale);
assert!(
halfsize >= 50.0 + scale,
"halfsize {halfsize} must add scale pad"
);
assert!(
halfsize <= 50.05,
"halfsize {halfsize} must stay close to half x-range"
);
}
fn sample_point() -> RawPoint {
RawPoint {
x: -123456,
y: 789012,
z: -1,
intensity: 65535,
return_number: 3,
number_of_returns: 5,
classification: 6,
scan_angle: -15000,
user_data: 42,
point_source_id: 1001,
gps_time: 123456789.987654,
red: 255,
green: 0,
blue: 65535,
nir: 32768,
}
}
#[test]
fn rawpoint_roundtrip_single() {
let p = sample_point();
let mut buf = Vec::new();
p.write(&mut buf).unwrap();
assert_eq!(buf.len(), RawPoint::BYTE_SIZE);
let p2 = RawPoint::read(&mut &buf[..]).unwrap();
assert_eq!(p.x, p2.x);
assert_eq!(p.y, p2.y);
assert_eq!(p.z, p2.z);
assert_eq!(p.intensity, p2.intensity);
assert_eq!(p.return_number, p2.return_number);
assert_eq!(p.number_of_returns, p2.number_of_returns);
assert_eq!(p.classification, p2.classification);
assert_eq!(p.scan_angle, p2.scan_angle);
assert_eq!(p.user_data, p2.user_data);
assert_eq!(p.point_source_id, p2.point_source_id);
assert_eq!(p.gps_time, p2.gps_time);
assert_eq!(p.red, p2.red);
assert_eq!(p.green, p2.green);
assert_eq!(p.blue, p2.blue);
assert_eq!(p.nir, p2.nir);
}
#[test]
fn rawpoint_roundtrip_bulk() {
let points = vec![
sample_point(),
RawPoint {
x: 0,
y: 0,
z: 0,
intensity: 0,
return_number: 0,
number_of_returns: 0,
classification: 0,
scan_angle: 0,
user_data: 0,
point_source_id: 0,
gps_time: 0.0,
red: 0,
green: 0,
blue: 0,
nir: 0,
},
sample_point(),
];
let mut buf = Vec::new();
RawPoint::write_bulk(&points, &mut buf).unwrap();
assert_eq!(buf.len(), RawPoint::BYTE_SIZE * 3);
let mut cursor = std::io::Cursor::new(&buf[..]);
for orig in &points {
let p = RawPoint::read(&mut cursor).unwrap();
assert_eq!(orig.x, p.x);
assert_eq!(orig.gps_time, p.gps_time);
assert_eq!(orig.nir, p.nir);
}
}
#[test]
fn lz4_multi_batch_round_trip() {
let codec = crate::TempCompression::Lz4;
let batches: Vec<Vec<RawPoint>> = vec![
(0..10).map(|_| sample_point()).collect(),
(0..1).map(|_| sample_point()).collect(),
(0..1000).map(|_| sample_point()).collect(),
(0..42).map(|_| sample_point()).collect(),
];
let expected: usize = batches.iter().map(|b| b.len()).sum();
let mut buf: Vec<u8> = Vec::new();
for batch in &batches {
write_temp_batch(&mut buf, batch, codec).unwrap();
}
let decoded = read_temp_batches(std::io::Cursor::new(&buf[..]), codec).unwrap();
assert_eq!(
decoded.len(),
expected,
"multi-frame LZ4 decode must recover every point"
);
}
#[test]
fn lz4_multi_batch_count_temp_file_points() {
let codec = crate::TempCompression::Lz4;
let batches: Vec<Vec<RawPoint>> = vec![
(0..7).map(|_| sample_point()).collect(),
(0..13).map(|_| sample_point()).collect(),
(0..21).map(|_| sample_point()).collect(),
];
let expected: u64 = batches.iter().map(|b| b.len() as u64).sum();
let tmp = std::env::temp_dir().join(format!("copc_lz4_mf_count_{}", std::process::id()));
{
let mut f = BufWriter::new(File::create(&tmp).unwrap());
for batch in &batches {
write_temp_batch(&mut f, batch, codec).unwrap();
}
f.flush().unwrap();
}
let got = count_temp_file_points(&tmp, codec).unwrap();
let _ = std::fs::remove_file(&tmp);
assert_eq!(got, expected, "multi-frame count must match total");
}
#[test]
fn rawpoint_bulk_matches_single() {
let p = sample_point();
let mut single_buf = Vec::new();
p.write(&mut single_buf).unwrap();
let mut bulk_buf = Vec::new();
RawPoint::write_bulk(std::slice::from_ref(&p), &mut bulk_buf).unwrap();
assert_eq!(
single_buf, bulk_buf,
"bulk write must produce identical bytes to single write"
);
}
#[test]
fn input_to_copc_format_mapping() {
assert_eq!(input_to_copc_format(0), 6);
assert_eq!(input_to_copc_format(1), 6);
assert_eq!(input_to_copc_format(4), 6);
assert_eq!(input_to_copc_format(6), 6);
assert_eq!(input_to_copc_format(9), 6);
assert_eq!(input_to_copc_format(2), 7);
assert_eq!(input_to_copc_format(3), 7);
assert_eq!(input_to_copc_format(5), 7);
assert_eq!(input_to_copc_format(7), 7);
assert_eq!(input_to_copc_format(8), 8);
assert_eq!(input_to_copc_format(10), 8);
}
use crate::chunking::{ChunkPlan, PlannedChunk};
fn make_plan(grid_size: u32, chunks: Vec<PlannedChunk>) -> ChunkPlan {
let total_points = chunks.iter().map(|c| c.point_count).sum();
ChunkPlan {
grid_size,
grid_depth: grid_size.trailing_zeros(),
chunk_target: 1_000_000,
total_points,
chunks,
header_mismatch: None,
}
}
#[test]
fn build_chunk_lut_single_root_chunk() {
let plan = make_plan(
4,
vec![PlannedChunk {
level: 0,
gx: 0,
gy: 0,
gz: 0,
point_count: 100,
}],
);
let lut = build_chunk_lut(&plan);
assert_eq!(lut.len(), 64);
assert!(lut.iter().all(|&c| c == 0));
}
#[test]
fn build_chunk_lut_eight_finest_chunks() {
let chunks: Vec<PlannedChunk> = (0i32..8)
.map(|i| PlannedChunk {
level: 1,
gx: i % 2,
gy: (i / 2) % 2,
gz: i / 4,
point_count: 10,
})
.collect();
let plan = make_plan(2, chunks);
let lut = build_chunk_lut(&plan);
assert_eq!(lut.len(), 8);
let mut seen: std::collections::HashSet<u32> = std::collections::HashSet::new();
for &c in &lut {
assert!(c < 8, "got out-of-range chunk idx {c}");
seen.insert(c);
}
assert_eq!(seen.len(), 8);
}
#[test]
fn build_chunk_lut_mixed_levels() {
let plan = make_plan(
4,
vec![
PlannedChunk {
level: 1,
gx: 0,
gy: 0,
gz: 0,
point_count: 50,
},
PlannedChunk {
level: 2,
gx: 3,
gy: 3,
gz: 3,
point_count: 5,
},
],
);
let lut = build_chunk_lut(&plan);
assert_eq!(lut.len(), 64);
for z in 0..2 {
for y in 0..2 {
for x in 0..2 {
let idx = x + y * 4 + z * 16;
assert_eq!(lut[idx], 0, "cell ({x},{y},{z}) should map to chunk 0");
}
}
}
assert_eq!(lut[3 + 3 * 4 + 3 * 16], 1);
let uncovered = lut.iter().filter(|&&c| c == u32::MAX).count();
assert_eq!(uncovered, 55);
}
#[test]
fn chunk_local_extra_levels_basic() {
assert_eq!(chunk_local_extra_levels(0), 0);
assert_eq!(chunk_local_extra_levels(1), 0);
assert_eq!(chunk_local_extra_levels(MAX_LEAF_POINTS), 0);
assert_eq!(chunk_local_extra_levels(MAX_LEAF_POINTS + 1), 1);
assert_eq!(chunk_local_extra_levels(8 * MAX_LEAF_POINTS), 1);
assert_eq!(chunk_local_extra_levels(8 * MAX_LEAF_POINTS + 1), 2);
assert_eq!(chunk_local_extra_levels(36_000_000), 3);
assert_eq!(chunk_local_extra_levels(100_000_000), 4);
}
#[test]
fn build_chunk_lut_chunk_at_corner() {
let plan = make_plan(
8,
vec![PlannedChunk {
level: 2,
gx: 3,
gy: 3,
gz: 3,
point_count: 10,
}],
);
let lut = build_chunk_lut(&plan);
assert_eq!(lut.len(), 512);
let g = 8;
for z in 6..8 {
for y in 6..8 {
for x in 6..8 {
let idx = x + y * g + z * g * g;
assert_eq!(lut[idx], 0, "cell ({x},{y},{z}) should be in the chunk");
}
}
}
let in_chunk = lut.iter().filter(|&&c| c == 0).count();
assert_eq!(in_chunk, 8);
}
}