use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::{sync_channel, SyncSender, TrySendError};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use threecrate_core::{Error, Point3f, PointCloud, Result};
pub trait StreamingPipeline<T> {
type Output;
fn process_chunk(&mut self, chunk: &[T]) -> Result<()>;
fn finalize(self) -> Result<Self::Output>;
fn memory_bytes(&self) -> usize { 0 }
}
#[derive(Debug, Clone, Default)]
pub struct RunStats {
pub items_processed: usize,
pub chunks_processed: usize,
pub errors_skipped: usize,
}
#[derive(Debug, Clone)]
pub struct RunOptions {
pub skip_errors: bool,
}
impl Default for RunOptions {
fn default() -> Self { Self { skip_errors: false } }
}
pub fn run_pipeline<T, P>(
pipeline: &mut P,
source: impl Iterator<Item = Result<T>>,
chunk_size: usize,
) -> Result<RunStats>
where
P: StreamingPipeline<T>,
{
run_pipeline_with_options(pipeline, source, chunk_size, &RunOptions::default())
}
pub fn run_pipeline_with_options<T, P>(
pipeline: &mut P,
source: impl Iterator<Item = Result<T>>,
chunk_size: usize,
opts: &RunOptions,
) -> Result<RunStats>
where
P: StreamingPipeline<T>,
{
if chunk_size == 0 {
return Err(Error::InvalidData("chunk_size must be ≥ 1".into()));
}
let mut stats = RunStats::default();
let mut chunk: Vec<T> = Vec::with_capacity(chunk_size);
for item in source {
match item {
Ok(point) => {
chunk.push(point);
if chunk.len() == chunk_size {
pipeline.process_chunk(&chunk)?;
stats.items_processed += chunk.len();
stats.chunks_processed += 1;
chunk.clear();
}
}
Err(e) => {
if opts.skip_errors {
stats.errors_skipped += 1;
} else {
return Err(e);
}
}
}
}
if !chunk.is_empty() {
pipeline.process_chunk(&chunk)?;
stats.items_processed += chunk.len();
stats.chunks_processed += 1;
}
Ok(stats)
}
#[derive(Debug, Clone)]
pub struct StreamingVoxelFilterConfig {
pub voxel_size: f32,
}
pub struct StreamingVoxelFilter {
config: StreamingVoxelFilterConfig,
voxels: HashMap<(i32, i32, i32), ([f64; 3], u32)>,
}
impl StreamingVoxelFilter {
pub fn new(config: StreamingVoxelFilterConfig) -> Self {
Self { config, voxels: HashMap::new() }
}
#[inline]
fn voxel_key(&self, p: &Point3f) -> (i32, i32, i32) {
let inv = 1.0 / self.config.voxel_size;
(
(p.x * inv).floor() as i32,
(p.y * inv).floor() as i32,
(p.z * inv).floor() as i32,
)
}
pub fn voxel_count(&self) -> usize { self.voxels.len() }
}
impl StreamingPipeline<Point3f> for StreamingVoxelFilter {
type Output = PointCloud<Point3f>;
fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
if self.config.voxel_size <= 0.0 {
return Err(Error::InvalidData("voxel_size must be positive".into()));
}
for p in chunk {
let key = self.voxel_key(p);
let entry = self.voxels.entry(key).or_insert(([0.0; 3], 0));
entry.0[0] += p.x as f64;
entry.0[1] += p.y as f64;
entry.0[2] += p.z as f64;
entry.1 += 1;
}
Ok(())
}
fn finalize(self) -> Result<PointCloud<Point3f>> {
let points: Vec<Point3f> = self
.voxels
.values()
.map(|(sum, count)| {
let n = *count as f64;
Point3f::new((sum[0] / n) as f32, (sum[1] / n) as f32, (sum[2] / n) as f32)
})
.collect();
Ok(PointCloud::from_points(points))
}
fn memory_bytes(&self) -> usize {
self.voxels.len() * 90
}
}
#[derive(Debug, Clone)]
pub struct PointCloudStats {
pub point_count: u64,
pub min: Point3f,
pub max: Point3f,
pub mean: Point3f,
}
pub struct StreamingStatistics {
count: u64,
min: [f32; 3],
max: [f32; 3],
sum: [f64; 3],
}
impl StreamingStatistics {
pub fn new() -> Self {
Self {
count: 0,
min: [f32::INFINITY; 3],
max: [f32::NEG_INFINITY; 3],
sum: [0.0; 3],
}
}
}
impl Default for StreamingStatistics {
fn default() -> Self { Self::new() }
}
impl StreamingPipeline<Point3f> for StreamingStatistics {
type Output = PointCloudStats;
fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
for p in chunk {
self.count += 1;
self.min[0] = self.min[0].min(p.x);
self.min[1] = self.min[1].min(p.y);
self.min[2] = self.min[2].min(p.z);
self.max[0] = self.max[0].max(p.x);
self.max[1] = self.max[1].max(p.y);
self.max[2] = self.max[2].max(p.z);
self.sum[0] += p.x as f64;
self.sum[1] += p.y as f64;
self.sum[2] += p.z as f64;
}
Ok(())
}
fn finalize(self) -> Result<PointCloudStats> {
if self.count == 0 {
return Err(Error::InvalidData("no points were processed".into()));
}
let n = self.count as f64;
Ok(PointCloudStats {
point_count: self.count,
min: Point3f::new(self.min[0], self.min[1], self.min[2]),
max: Point3f::new(self.max[0], self.max[1], self.max[2]),
mean: Point3f::new(
(self.sum[0] / n) as f32,
(self.sum[1] / n) as f32,
(self.sum[2] / n) as f32,
),
})
}
fn memory_bytes(&self) -> usize { std::mem::size_of::<Self>() }
}
pub struct StreamingCollector {
points: Vec<Point3f>,
}
impl StreamingCollector {
pub fn new() -> Self { Self { points: Vec::new() } }
pub fn with_capacity(cap: usize) -> Self {
Self { points: Vec::with_capacity(cap) }
}
}
impl Default for StreamingCollector {
fn default() -> Self { Self::new() }
}
impl StreamingPipeline<Point3f> for StreamingCollector {
type Output = PointCloud<Point3f>;
fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
self.points.extend_from_slice(chunk);
Ok(())
}
fn finalize(self) -> Result<PointCloud<Point3f>> {
Ok(PointCloud::from_points(self.points))
}
fn memory_bytes(&self) -> usize {
self.points.len() * std::mem::size_of::<Point3f>()
}
}
pub fn cloud_as_stream(
cloud: &PointCloud<Point3f>,
) -> impl Iterator<Item = Result<Point3f>> + '_ {
cloud.points.iter().copied().map(Ok)
}
#[derive(Debug, Clone)]
pub struct BackpressureConfig {
pub max_queue_depth: usize,
pub chunk_size: usize,
pub flush_timeout: Option<Duration>,
}
impl Default for BackpressureConfig {
fn default() -> Self {
Self {
max_queue_depth: 1024,
chunk_size: 256,
flush_timeout: Some(Duration::from_millis(10)),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct RealtimeMetrics {
pub items_queued: u64,
pub items_processed: u64,
pub items_dropped: u64,
pub estimated_queue_depth: u64,
}
struct SharedMetrics {
items_queued: AtomicU64,
items_processed: AtomicU64,
items_dropped: AtomicU64,
}
impl SharedMetrics {
fn new() -> Arc<Self> {
Arc::new(Self {
items_queued: AtomicU64::new(0),
items_processed: AtomicU64::new(0),
items_dropped: AtomicU64::new(0),
})
}
fn snapshot(&self) -> RealtimeMetrics {
let queued = self.items_queued.load(Ordering::Relaxed);
let processed = self.items_processed.load(Ordering::Relaxed);
let dropped = self.items_dropped.load(Ordering::Relaxed);
RealtimeMetrics {
items_queued: queued,
items_processed: processed,
items_dropped: dropped,
estimated_queue_depth: queued.saturating_sub(processed),
}
}
}
pub struct RealtimePipeline<T: Send + 'static, O: Send + 'static> {
sender: Option<SyncSender<T>>,
metrics: Arc<SharedMetrics>,
join_handle: Option<JoinHandle<Result<O>>>,
}
impl<T: Send + 'static, O: Send + 'static> RealtimePipeline<T, O> {
pub fn new<P>(pipeline: P, config: BackpressureConfig) -> Self
where
P: StreamingPipeline<T, Output = O> + Send + 'static,
{
assert!(config.chunk_size >= 1, "chunk_size must be ≥ 1");
assert!(config.max_queue_depth >= 1, "max_queue_depth must be ≥ 1");
let (sender, receiver) = sync_channel::<T>(config.max_queue_depth);
let metrics = SharedMetrics::new();
let metrics_worker = Arc::clone(&metrics);
let chunk_size = config.chunk_size;
let flush_timeout = config.flush_timeout;
let join_handle = thread::spawn(move || {
realtime_worker(receiver, pipeline, chunk_size, flush_timeout, metrics_worker)
});
Self { sender: Some(sender), metrics, join_handle: Some(join_handle) }
}
pub fn send(&self, item: T) -> Result<()> {
let sender = self
.sender
.as_ref()
.ok_or_else(|| Error::InvalidData("pipeline already finished".into()))?;
sender
.send(item)
.map_err(|_| Error::InvalidData("pipeline worker has terminated".into()))?;
self.metrics.items_queued.fetch_add(1, Ordering::Relaxed);
Ok(())
}
pub fn try_send(&self, item: T) -> Result<bool> {
let sender = self
.sender
.as_ref()
.ok_or_else(|| Error::InvalidData("pipeline already finished".into()))?;
match sender.try_send(item) {
Ok(()) => {
self.metrics.items_queued.fetch_add(1, Ordering::Relaxed);
Ok(true)
}
Err(TrySendError::Full(_)) => {
self.metrics.items_dropped.fetch_add(1, Ordering::Relaxed);
Ok(false)
}
Err(TrySendError::Disconnected(_)) => {
Err(Error::InvalidData("pipeline worker has terminated".into()))
}
}
}
pub fn metrics(&self) -> RealtimeMetrics {
self.metrics.snapshot()
}
pub fn finish(mut self) -> Result<O> {
self.sender = None;
self.join_handle
.take()
.expect("pipeline already finished")
.join()
.map_err(|_| Error::InvalidData("pipeline worker panicked".into()))?
}
}
impl<T: Send + 'static, O: Send + 'static> Drop for RealtimePipeline<T, O> {
fn drop(&mut self) {
self.sender = None;
if let Some(handle) = self.join_handle.take() {
let _ = handle.join();
}
}
}
fn realtime_worker<T, P>(
receiver: std::sync::mpsc::Receiver<T>,
mut pipeline: P,
chunk_size: usize,
flush_timeout: Option<Duration>,
metrics: Arc<SharedMetrics>,
) -> Result<P::Output>
where
P: StreamingPipeline<T>,
{
let mut chunk: Vec<T> = Vec::with_capacity(chunk_size);
match flush_timeout {
None => {
for item in receiver {
metrics.items_processed.fetch_add(1, Ordering::Relaxed);
chunk.push(item);
if chunk.len() >= chunk_size {
pipeline.process_chunk(&chunk)?;
chunk.clear();
}
}
}
Some(timeout) => {
use std::sync::mpsc::RecvTimeoutError;
loop {
match receiver.recv_timeout(timeout) {
Ok(item) => {
metrics.items_processed.fetch_add(1, Ordering::Relaxed);
chunk.push(item);
if chunk.len() >= chunk_size {
pipeline.process_chunk(&chunk)?;
chunk.clear();
}
}
Err(RecvTimeoutError::Timeout) => {
if !chunk.is_empty() {
pipeline.process_chunk(&chunk)?;
chunk.clear();
}
}
Err(RecvTimeoutError::Disconnected) => break,
}
}
}
}
if !chunk.is_empty() {
pipeline.process_chunk(&chunk)?;
}
pipeline.finalize()
}
#[cfg(test)]
mod tests {
use super::*;
fn grid_cloud(n: usize) -> PointCloud<Point3f> {
let pts: Vec<Point3f> = (0..n)
.map(|i| Point3f::new(i as f32 * 0.1, 0.0, 0.0))
.collect();
PointCloud::from_points(pts)
}
#[test]
fn test_collector_round_trip() {
let cloud = grid_cloud(25);
let mut collector = StreamingCollector::new();
let stats = run_pipeline(&mut collector, cloud_as_stream(&cloud), 8).unwrap();
let out = collector.finalize().unwrap();
assert_eq!(stats.items_processed, 25);
assert_eq!(stats.chunks_processed, 4); assert_eq!(out.len(), 25);
}
#[test]
fn test_statistics_correctness() {
let cloud = PointCloud::from_points(vec![
Point3f::new(0.0, 0.0, 0.0),
Point3f::new(1.0, 0.0, 0.0),
Point3f::new(2.0, 0.0, 0.0),
]);
let mut stats_pipe = StreamingStatistics::new();
run_pipeline(&mut stats_pipe, cloud_as_stream(&cloud), 2).unwrap();
let s = stats_pipe.finalize().unwrap();
assert_eq!(s.point_count, 3);
assert!((s.min.x - 0.0).abs() < 1e-6);
assert!((s.max.x - 2.0).abs() < 1e-6);
assert!((s.mean.x - 1.0).abs() < 1e-6);
}
#[test]
fn test_statistics_empty_fails() {
let mut stats_pipe = StreamingStatistics::new();
assert!(stats_pipe.finalize().is_err());
}
#[test]
fn test_voxel_filter_reduces_density() {
let cloud = grid_cloud(100);
let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
let mut filter = StreamingVoxelFilter::new(config);
run_pipeline(&mut filter, cloud_as_stream(&cloud), 32).unwrap();
let out = filter.finalize().unwrap();
assert!(out.len() <= 10, "expected ≤10 voxels, got {}", out.len());
assert!(!out.is_empty());
}
#[test]
fn test_voxel_filter_centroid() {
let cloud = PointCloud::from_points(vec![
Point3f::new(0.1, 0.0, 0.0),
Point3f::new(0.3, 0.0, 0.0),
]);
let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
let mut filter = StreamingVoxelFilter::new(config);
run_pipeline(&mut filter, cloud_as_stream(&cloud), 10).unwrap();
let out = filter.finalize().unwrap();
assert_eq!(out.len(), 1);
assert!((out.points[0].x - 0.2).abs() < 1e-5);
}
#[test]
fn test_voxel_filter_across_chunk_boundary() {
let cloud = PointCloud::from_points(vec![
Point3f::new(0.1, 0.0, 0.0),
Point3f::new(0.9, 0.0, 0.0),
]);
let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
let mut filter = StreamingVoxelFilter::new(config);
run_pipeline(&mut filter, cloud_as_stream(&cloud), 1).unwrap();
let out = filter.finalize().unwrap();
assert_eq!(out.len(), 1, "points in the same voxel across chunks should merge");
assert!((out.points[0].x - 0.5).abs() < 1e-5);
}
#[test]
fn test_invalid_voxel_size() {
let config = StreamingVoxelFilterConfig { voxel_size: -1.0 };
let mut filter = StreamingVoxelFilter::new(config);
let cloud = PointCloud::from_points(vec![Point3f::new(0.0, 0.0, 0.0)]);
let result = run_pipeline(&mut filter, cloud_as_stream(&cloud), 1);
assert!(result.is_err());
}
#[test]
fn test_skip_errors() {
let source: Vec<Result<Point3f>> = vec![
Ok(Point3f::new(0.0, 0.0, 0.0)),
Err(Error::InvalidData("bad point".into())),
Ok(Point3f::new(1.0, 0.0, 0.0)),
];
let mut collector = StreamingCollector::new();
let run_stats = run_pipeline_with_options(
&mut collector,
source.into_iter(),
10,
&RunOptions { skip_errors: true },
)
.unwrap();
let out = collector.finalize().unwrap();
assert_eq!(out.len(), 2);
assert_eq!(run_stats.errors_skipped, 1);
}
#[test]
fn test_error_propagation() {
let source: Vec<Result<Point3f>> = vec![
Ok(Point3f::new(0.0, 0.0, 0.0)),
Err(Error::InvalidData("bad point".into())),
];
let mut collector = StreamingCollector::new();
assert!(run_pipeline(&mut collector, source.into_iter(), 10).is_err());
}
#[test]
fn test_chunk_size_zero_fails() {
let mut collector = StreamingCollector::new();
let result = run_pipeline(
&mut collector,
std::iter::empty::<Result<Point3f>>(),
0,
);
assert!(result.is_err());
}
#[test]
fn test_memory_bytes() {
let mut filter = StreamingVoxelFilter::new(StreamingVoxelFilterConfig { voxel_size: 0.5 });
let cloud = grid_cloud(20);
run_pipeline(&mut filter, cloud_as_stream(&cloud), 5).unwrap();
assert!(filter.memory_bytes() > 0);
}
#[test]
fn test_realtime_basic_round_trip() {
let config = BackpressureConfig { max_queue_depth: 32, chunk_size: 8, flush_timeout: None };
let rt = RealtimePipeline::new(StreamingCollector::new(), config);
for i in 0..50_u32 {
rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
}
let cloud = rt.finish().unwrap();
assert_eq!(cloud.len(), 50);
}
#[test]
fn test_realtime_with_flush_timeout() {
let config = BackpressureConfig {
max_queue_depth: 64,
chunk_size: 100,
flush_timeout: Some(Duration::from_millis(5)),
};
let rt = RealtimePipeline::new(StreamingCollector::new(), config);
for i in 0..20_u32 {
rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
}
let cloud = rt.finish().unwrap();
assert_eq!(cloud.len(), 20);
}
#[test]
fn test_realtime_voxel_filter() {
let filter = StreamingVoxelFilter::new(StreamingVoxelFilterConfig { voxel_size: 1.0 });
let config = BackpressureConfig { max_queue_depth: 64, chunk_size: 16, flush_timeout: None };
let rt = RealtimePipeline::new(filter, config);
for i in 0..100_u32 {
rt.send(Point3f::new(i as f32 * 0.1, 0.0, 0.0)).unwrap();
}
let cloud = rt.finish().unwrap();
assert!(cloud.len() <= 10, "expected ≤10 voxels, got {}", cloud.len());
assert!(!cloud.is_empty());
}
#[test]
fn test_realtime_metrics_queued_count() {
let config = BackpressureConfig { max_queue_depth: 64, chunk_size: 32, flush_timeout: None };
let rt = RealtimePipeline::new(StreamingCollector::new(), config);
for i in 0..20_u32 {
rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
}
let m = rt.metrics();
assert_eq!(m.items_queued, 20);
assert_eq!(m.items_dropped, 0);
rt.finish().unwrap();
}
#[test]
fn test_realtime_try_send_accepts_when_space() {
let config = BackpressureConfig { max_queue_depth: 16, chunk_size: 8, flush_timeout: None };
let rt = RealtimePipeline::new(StreamingCollector::new(), config);
let accepted = rt.try_send(Point3f::new(1.0, 0.0, 0.0)).unwrap();
assert!(accepted, "should accept item when queue has space");
let m = rt.metrics();
assert_eq!(m.items_queued, 1);
assert_eq!(m.items_dropped, 0);
let cloud = rt.finish().unwrap();
assert_eq!(cloud.len(), 1);
}
#[test]
fn test_realtime_try_send_drops_when_full() {
use std::sync::{Condvar, Mutex};
struct LatchedCollector {
latch: Arc<(Mutex<bool>, Condvar)>,
inner: StreamingCollector,
blocked: bool,
}
impl StreamingPipeline<Point3f> for LatchedCollector {
type Output = PointCloud<Point3f>;
fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
if !self.blocked {
self.blocked = true;
let (lock, cv) = &*self.latch;
let mut released = lock.lock().unwrap();
while !*released {
released = cv.wait(released).unwrap();
}
}
self.inner.process_chunk(chunk)
}
fn finalize(self) -> Result<PointCloud<Point3f>> {
self.inner.finalize()
}
}
let latch = Arc::new((Mutex::new(false), Condvar::new()));
let latch_release = Arc::clone(&latch);
let config = BackpressureConfig { max_queue_depth: 1, chunk_size: 1, flush_timeout: None };
let rt = RealtimePipeline::new(
LatchedCollector { latch, inner: StreamingCollector::new(), blocked: false },
config,
);
rt.send(Point3f::new(0.0, 0.0, 0.0)).unwrap();
std::thread::sleep(Duration::from_millis(20));
let mut accepted = 0usize;
let mut dropped = 0usize;
for i in 1..=8_u32 {
if rt.try_send(Point3f::new(i as f32, 0.0, 0.0)).unwrap() {
accepted += 1;
} else {
dropped += 1;
}
}
assert!(dropped > 0, "expected at least one drop with max_queue_depth=1");
let (lock, cv) = &*latch_release;
*lock.lock().unwrap() = true;
cv.notify_all();
let total_dropped = rt.metrics().items_dropped;
let cloud = rt.finish().unwrap();
assert_eq!(cloud.len(), 1 + accepted);
assert_eq!(total_dropped, dropped as u64);
}
#[test]
fn test_realtime_drop_without_finish() {
let config = BackpressureConfig { max_queue_depth: 16, chunk_size: 4, flush_timeout: None };
let rt = RealtimePipeline::new(StreamingCollector::new(), config);
for i in 0..10_u32 {
rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
}
drop(rt); }
#[test]
fn test_realtime_large_workload() {
let config = BackpressureConfig {
max_queue_depth: 512,
chunk_size: 128,
flush_timeout: None,
};
let rt = RealtimePipeline::new(StreamingCollector::new(), config);
const N: u32 = 10_000;
for i in 0..N {
rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
}
let cloud = rt.finish().unwrap();
assert_eq!(cloud.len(), N as usize);
}
}