cloudini 0.3.2

The cloudini point cloud compression library for Rust.
Documentation
//! Pointcloud decoder — reconstructs raw point data from a Cloudini buffer.

use crate::field_decoder::build_decoders;
use crate::header::decode_header;
use crate::types::{CompressionOption, EncodingInfo, POINTS_PER_CHUNK};
use crate::{Error, Result};

/// Decodes a Cloudini buffer back into raw point bytes.
///
/// The decoder is cheap to create and reuse across multiple buffers.
///
/// # Example
///
/// ```rust
/// use cloudini::{
///     CompressionOption, EncodingInfo, EncodingOptions, FieldType, PointField,
///     PointcloudDecoder, PointcloudEncoder,
/// };
///
/// let info = EncodingInfo {
///     fields: vec![
///         PointField { name: "x".into(), offset: 0, field_type: FieldType::Float32, resolution: Some(0.001) },
///         PointField { name: "y".into(), offset: 4, field_type: FieldType::Float32, resolution: Some(0.001) },
///         PointField { name: "z".into(), offset: 8, field_type: FieldType::Float32, resolution: Some(0.001) },
///     ],
///     width: 50,
///     height: 1,
///     point_step: 12,
///     encoding_opt: EncodingOptions::Lossy,
///     compression_opt: CompressionOption::Lz4,
///     ..EncodingInfo::default()
/// };
///
/// # let raw = vec![0u8; 50 * 12];
/// let compressed = PointcloudEncoder::new(info).encode(&raw).unwrap();
///
/// let decoder = PointcloudDecoder::new();
/// let (encoding_info, decoded) = decoder.decode(&compressed).unwrap();
/// assert_eq!(decoded.len(), raw.len());
/// ```
pub struct PointcloudDecoder {
    #[cfg(feature = "parallel")]
    threads: bool,
}

impl PointcloudDecoder {
    /// Create a new decoder (single-threaded by default).
    pub fn new() -> Self {
        Self {
            #[cfg(feature = "parallel")]
            threads: false,
        }
    }

    /// Enable or disable parallel chunk decoding (requires the `parallel` feature).
    ///
    /// When enabled, each compressed chunk is decompressed and field-decoded on a
    /// separate rayon thread. Most beneficial for large clouds (many chunks).
    ///
    /// Without the `parallel` feature this is a no-op.
    pub fn with_threads(self, threads: bool) -> Self {
        let _ = threads;
        #[cfg(feature = "parallel")]
        return Self { threads, ..self };
        #[cfg(not(feature = "parallel"))]
        return self;
    }

    /// Decode a complete Cloudini buffer, including its embedded header.
    ///
    /// Returns `(EncodingInfo, point_bytes)` where `point_bytes` is laid out as
    /// `width * height` consecutive points, each of `point_step` bytes.
    ///
    /// # Errors
    ///
    /// Returns an error if the magic bytes are missing, the version is unsupported,
    /// the header YAML is malformed, a chunk is truncated, or decompression fails.
    pub fn decode(&self, data: &[u8]) -> Result<(EncodingInfo, Vec<u8>)> {
        let (info, header_len) = decode_header(data)?;
        let compressed_data = &data[header_len..];
        let point_data = self.decode_with_info(&info, compressed_data)?;
        Ok((info, point_data))
    }

    /// Decode compressed chunk data using a previously parsed [`EncodingInfo`].
    ///
    /// `compressed_data` must **not** include the header — pass the slice starting
    /// immediately after the null terminator that ends the YAML block.
    pub fn decode_with_info(&self, info: &EncodingInfo, compressed_data: &[u8]) -> Result<Vec<u8>> {
        let total_points = info.width as usize * info.height as usize;

        if info.version >= 3 {
            // Version 3+: stream of [u32 LE chunk_size][compressed_bytes] pairs.
            // Scan boundaries first (sequential, cheap — just reading 4-byte lengths).
            let chunk_infos = collect_chunks(compressed_data, total_points)?;

            #[cfg(feature = "parallel")]
            if self.threads {
                return decode_parallel(info, &chunk_infos, total_points);
            }

            decode_sequential(info, &chunk_infos, total_points)
        } else {
            // Version < 3: single unframed chunk (all points in one block).
            let mut output = vec![0u8; total_points * info.point_step as usize];
            let mut decoders = build_decoders(&info.fields, info.encoding_opt);
            let written = decode_one_chunk(
                info,
                compressed_data,
                &mut output,
                &mut decoders,
                total_points,
            )?;
            output.truncate(written);
            Ok(output)
        }
    }
}

impl Default for PointcloudDecoder {
    fn default() -> Self {
        Self::new()
    }
}

/// A located chunk: byte slice in the compressed stream + expected point count.
struct ChunkInfo<'a> {
    bytes: &'a [u8],
    points: usize,
}

/// Walk the framed chunk stream and collect slices + expected point counts.
fn collect_chunks<'a>(
    compressed_data: &'a [u8],
    total_points: usize,
) -> Result<Vec<ChunkInfo<'a>>> {
    let mut chunks = Vec::new();
    let mut pos = 0;
    let mut points_remaining = total_points;

    while pos < compressed_data.len() {
        if pos + 4 > compressed_data.len() {
            return Err(Error::Truncated("chunk size prefix".into()));
        }
        let chunk_size = u32::from_le_bytes([
            compressed_data[pos],
            compressed_data[pos + 1],
            compressed_data[pos + 2],
            compressed_data[pos + 3],
        ]) as usize;
        pos += 4;

        if pos + chunk_size > compressed_data.len() {
            return Err(Error::Truncated("chunk size exceeds buffer".into()));
        }

        let chunk_points = points_remaining.min(POINTS_PER_CHUNK);
        chunks.push(ChunkInfo {
            bytes: &compressed_data[pos..pos + chunk_size],
            points: chunk_points,
        });
        points_remaining -= chunk_points;
        pos += chunk_size;
    }

    Ok(chunks)
}

// ── sequential decode ─────────────────────────────────────────────────────────

fn decode_sequential(
    info: &EncodingInfo,
    chunks: &[ChunkInfo<'_>],
    total_points: usize,
) -> Result<Vec<u8>> {
    let point_step = info.point_step as usize;
    let mut output = vec![0u8; total_points * point_step];
    let mut decoders = build_decoders(&info.fields, info.encoding_opt);
    let mut output_pos = 0;

    for chunk in chunks {
        let out_slice = &mut output[output_pos..output_pos + chunk.points * point_step];
        let written = decode_one_chunk(info, chunk.bytes, out_slice, &mut decoders, chunk.points)?;
        output_pos += written;
    }

    output.truncate(output_pos);
    Ok(output)
}

#[cfg(feature = "parallel")]
fn decode_parallel(
    info: &EncodingInfo,
    chunks: &[ChunkInfo<'_>],
    total_points: usize,
) -> Result<Vec<u8>> {
    use rayon::prelude::*;

    // Each chunk is decoded independently (fresh decoder state) into its own Vec,
    // then the results are concatenated. This avoids unsafe aliasing on the output
    // buffer while still parallelising the dominant decompression + field-decode work.
    let chunk_outputs: Vec<Result<Vec<u8>>> = chunks
        .par_iter()
        .map(|chunk| {
            let point_step = info.point_step as usize;
            let max_output = total_points * point_step;
            let decompressed = decompress_chunk(chunk.bytes, info.compression_opt, max_output)?;
            let encoded_data = decompressed.as_deref().unwrap_or(chunk.bytes);

            let mut decoders = build_decoders(&info.fields, info.encoding_opt);
            let mut out = vec![0u8; chunk.points * point_step];
            let mut enc_pos = 0;
            let mut out_pos = 0;

            for _ in 0..chunk.points {
                let point_out = &mut out[out_pos..out_pos + point_step];
                for dec in decoders.iter_mut() {
                    let consumed = dec.decode(&encoded_data[enc_pos..], point_out)?;
                    enc_pos += consumed;
                }
                out_pos += point_step;
            }

            Ok(out)
        })
        .collect();

    let mut output = Vec::with_capacity(total_points * info.point_step as usize);
    for result in chunk_outputs {
        output.extend_from_slice(&result?);
    }
    Ok(output)
}

/// Decompress and field-decode one chunk into `output`. Resets `decoders` first.
///
/// Returns the number of bytes written. The caller must pass an output slice sized
/// to exactly `chunk_points * point_step` bytes.
fn decode_one_chunk(
    info: &EncodingInfo,
    chunk: &[u8],
    output: &mut [u8],
    decoders: &mut Vec<crate::field_decoder::FieldDecoder>,
    chunk_points: usize,
) -> Result<usize> {
    let max_output = info.width as usize * info.height as usize * info.point_step as usize;
    let decompressed = decompress_chunk(chunk, info.compression_opt, max_output)?;
    let encoded_data = decompressed.as_deref().unwrap_or(chunk);

    // Reset delta/XOR state for this chunk (decoders are reused across chunks).
    for dec in decoders.iter_mut() {
        dec.reset();
    }

    let point_step = info.point_step as usize;
    let mut enc_pos = 0;
    let mut out_pos = 0;

    // Count-based loop: iterate exactly chunk_points times. The caller passes an
    // output slice sized to exactly chunk_points * point_step bytes, so the
    // per-iteration "output buffer too small" check is replaced by a single slice bound.
    for _ in 0..chunk_points {
        let point_out = &mut output[out_pos..out_pos + point_step];
        for dec in decoders.iter_mut() {
            let consumed = dec.decode(&encoded_data[enc_pos..], point_out)?;
            enc_pos += consumed;
        }
        out_pos += point_step;
    }

    Ok(out_pos)
}

/// Decompress one chunk. Returns `Some(decompressed)` or `None` for [`CompressionOption::None`].
fn decompress_chunk(
    data: &[u8],
    opt: CompressionOption,
    max_output: usize,
) -> Result<Option<Vec<u8>>> {
    match opt {
        CompressionOption::None => Ok(None),
        CompressionOption::Lz4 => {
            let mut out = vec![0u8; max_output];
            let n = lz4_flex::block::decompress_into(data, &mut out)
                .map_err(|e| Error::Lz4(e.to_string()))?;
            out.truncate(n);
            Ok(Some(out))
        }
        CompressionOption::Zstd => {
            let out =
                zstd::bulk::decompress(data, max_output).map_err(|e| Error::Zstd(e.to_string()))?;
            Ok(Some(out))
        }
    }
}