plozone 0.1.1

3D spatial zone engine: geofencing, octree hole-scanning, realtime sync (WebSocket + QUIC + io_uring), voxel pathfinding, and AV sensor fusion.
Documentation
//! Realtime ingestion pipeline (feature `pipeline`).
//!
//! A blocking worker thread that reads [`ScanFrame`](crate::lidar::ScanFrame)s from a channel, inserts
//! them into the octree, and periodically runs hole scans over all zones.
//! The caller provides the frame source and receives scan results on an
//! outbound channel — the pipeline itself is transport-agnostic.

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};

use crossbeam_channel::{Receiver, Sender};

use crate::octree::OctreeNode;
use crate::scan::{ScanMode, ScanResult, run_scan};
use crate::store::ZoneStore;

/// Shared octree behind an `RwLock`.
pub type SharedOctree = Arc<RwLock<OctreeNode>>;
/// Shared zone store behind an `RwLock`.
pub type SharedStore = Arc<RwLock<ZoneStore>>;

/// Tuning knobs for the pipeline loop.
#[derive(Debug, Clone)]
pub struct PipelineConfig {
    /// Octree subdivision threshold (passed to `OctreeNode::insert`).
    pub threshold: usize,
    /// Run a hole scan every this many frames.
    pub scan_every: usize,
    /// Scan strategy (coarse / precise / adaptive).
    pub mode: ScanMode,
}

impl Default for PipelineConfig {
    fn default() -> Self {
        Self { threshold: 8, scan_every: 100, mode: ScanMode::Coarse { depth: 6 } }
    }
}

/// A single point cloud frame arriving from any sensor.
#[derive(Debug, Clone)]
pub struct Frame {
    /// ENU-metre points to insert.
    pub points: Vec<[f64; 3]>,
    /// Octree subdivision threshold override (uses config default if `None`).
    pub threshold: Option<usize>,
}

/// Start the pipeline loop on the **current** thread (blocking).
///
/// Callers typically wrap this in `std::thread::spawn`.
///
/// ```ignore
/// let (frame_tx, frame_rx) = crossbeam_channel::bounded(256);
/// let (result_tx, result_rx) = crossbeam_channel::unbounded();
/// let running = Arc::new(AtomicBool::new(true));
///
/// std::thread::spawn(move || {
///     start_pipeline(config, octree, store, frame_rx, result_tx, running);
/// });
/// ```
pub fn start_pipeline(
    config: PipelineConfig,
    octree: SharedOctree,
    store: SharedStore,
    frame_rx: Receiver<Frame>,
    result_tx: Sender<(u32, ScanResult)>,
    running: Arc<AtomicBool>,
) {
    let mut tick = 0usize;

    while running.load(Ordering::Relaxed) {
        let frame = match frame_rx.recv_timeout(std::time::Duration::from_millis(50)) {
            Ok(f) => f,
            Err(_) => continue,
        };

        {
            let mut tree = octree.write().unwrap();
            let threshold = frame.threshold.unwrap_or(config.threshold);
            for &p in &frame.points {
                tree.insert(p, threshold);
            }
        }

        tick += 1;

        if tick.is_multiple_of(config.scan_every) {
            let tree = octree.read().unwrap();
            let store = store.read().unwrap();

            #[cfg(feature = "parallel")]
            {
                use rayon::prelude::*;
                let ids = store.ids();
                ids.par_iter().for_each(|&id| {
                    let result = run_scan(&tree, &store, id, &config.mode);
                    let _ = result_tx.try_send((id, result));
                });
            }

            #[cfg(not(feature = "parallel"))]
            {
                for id in store.ids() {
                    let result = run_scan(&tree, &store, id, &config.mode);
                    let _ = result_tx.try_send((id, result));
                }
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::coord::EnuConverter;
    use crate::zone::{Zone, ZoneEntry};

    #[test]
    fn pipeline_inserts_and_scans() {
        let conv = EnuConverter::new(10.7626, 106.6601, 0.0);
        let store = ZoneStore::from_entries(
        &[ZoneEntry::new(
            1,
            Zone::Cylinder {
                center: [10.7626, 106.6601],
                radius_m: 30.0,
                z_min: 0.0,
                z_max: 10.0,
            },
        )],
            &conv,
        );

        let octree = Arc::new(RwLock::new(OctreeNode::new([0.0; 3], 64.0)));
        let store = Arc::new(RwLock::new(store));

        let (frame_tx, frame_rx) = crossbeam_channel::bounded(64);
        let (result_tx, result_rx) = crossbeam_channel::unbounded();
        let running = Arc::new(AtomicBool::new(true));

        let config = PipelineConfig {
            threshold: 8,
            scan_every: 1,
            mode: ScanMode::Coarse { depth: 4 },
        };

        let oct_clone = octree.clone();
        let store_clone = store.clone();
        let running_clone = running.clone();

        let handle = std::thread::spawn(move || {
            start_pipeline(config, oct_clone, store_clone, frame_rx, result_tx, running_clone);
        });

        frame_tx
            .send(Frame { points: vec![[0.0, 0.0, 5.0]], threshold: None })
            .unwrap();

        let recv = result_rx.recv_timeout(std::time::Duration::from_secs(2));
        assert!(recv.is_ok(), "pipeline should emit a scan result");
        let (zone_id, result) = recv.unwrap();
        assert_eq!(zone_id, 1);
        assert!(result.coverage_pct > 0.0, "some coverage after inserting a point");

        running.store(false, Ordering::Relaxed);
        handle.join().unwrap();
    }
}