use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use crossbeam_channel::{Receiver, Sender};
use crate::octree::OctreeNode;
use crate::scan::{ScanMode, ScanResult, run_scan};
use crate::store::ZoneStore;
pub type SharedOctree = Arc<RwLock<OctreeNode>>;
pub type SharedStore = Arc<RwLock<ZoneStore>>;
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub threshold: usize,
pub scan_every: usize,
pub mode: ScanMode,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self { threshold: 8, scan_every: 100, mode: ScanMode::Coarse { depth: 6 } }
}
}
#[derive(Debug, Clone)]
pub struct Frame {
pub points: Vec<[f64; 3]>,
pub threshold: Option<usize>,
}
pub fn start_pipeline(
config: PipelineConfig,
octree: SharedOctree,
store: SharedStore,
frame_rx: Receiver<Frame>,
result_tx: Sender<(u32, ScanResult)>,
running: Arc<AtomicBool>,
) {
let mut tick = 0usize;
while running.load(Ordering::Relaxed) {
let frame = match frame_rx.recv_timeout(std::time::Duration::from_millis(50)) {
Ok(f) => f,
Err(_) => continue,
};
{
let mut tree = octree.write().unwrap();
let threshold = frame.threshold.unwrap_or(config.threshold);
for &p in &frame.points {
tree.insert(p, threshold);
}
}
tick += 1;
if tick.is_multiple_of(config.scan_every) {
let tree = octree.read().unwrap();
let store = store.read().unwrap();
#[cfg(feature = "parallel")]
{
use rayon::prelude::*;
let ids = store.ids();
ids.par_iter().for_each(|&id| {
let result = run_scan(&tree, &store, id, &config.mode);
let _ = result_tx.try_send((id, result));
});
}
#[cfg(not(feature = "parallel"))]
{
for id in store.ids() {
let result = run_scan(&tree, &store, id, &config.mode);
let _ = result_tx.try_send((id, result));
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::coord::EnuConverter;
use crate::zone::{Zone, ZoneEntry};
#[test]
fn pipeline_inserts_and_scans() {
let conv = EnuConverter::new(10.7626, 106.6601, 0.0);
let store = ZoneStore::from_entries(
&[ZoneEntry::new(
1,
Zone::Cylinder {
center: [10.7626, 106.6601],
radius_m: 30.0,
z_min: 0.0,
z_max: 10.0,
},
)],
&conv,
);
let octree = Arc::new(RwLock::new(OctreeNode::new([0.0; 3], 64.0)));
let store = Arc::new(RwLock::new(store));
let (frame_tx, frame_rx) = crossbeam_channel::bounded(64);
let (result_tx, result_rx) = crossbeam_channel::unbounded();
let running = Arc::new(AtomicBool::new(true));
let config = PipelineConfig {
threshold: 8,
scan_every: 1,
mode: ScanMode::Coarse { depth: 4 },
};
let oct_clone = octree.clone();
let store_clone = store.clone();
let running_clone = running.clone();
let handle = std::thread::spawn(move || {
start_pipeline(config, oct_clone, store_clone, frame_rx, result_tx, running_clone);
});
frame_tx
.send(Frame { points: vec![[0.0, 0.0, 5.0]], threshold: None })
.unwrap();
let recv = result_rx.recv_timeout(std::time::Duration::from_secs(2));
assert!(recv.is_ok(), "pipeline should emit a scan result");
let (zone_id, result) = recv.unwrap();
assert_eq!(zone_id, 1);
assert!(result.coverage_pct > 0.0, "some coverage after inserting a point");
running.store(false, Ordering::Relaxed);
handle.join().unwrap();
}
}