Skip to main content

threecrate_algorithms/
streaming.rs

1//! Streaming point cloud processing pipeline.
2//!
3//! Enables out-of-core processing of arbitrarily large point clouds by reading
4//! and processing data in bounded-size chunks.  Only one chunk resides in RAM at
5//! a time; the pipeline accumulates lightweight per-chunk state (e.g. a voxel map)
6//! that is orders of magnitude smaller than the full dataset.
7//!
8//! # Architecture
9//!
10//! ```text
11//! Source iterator                Pipeline stage           Output
12//! (file / network / …)  ──►  process_chunk(&[T])  ──►  finalize()
13//!      chunk 0                  accumulate state
14//!      chunk 1                  accumulate state
15//!      …                        …
16//!      chunk N                  accumulate state
17//! ```
18//!
19//! For real-time sources (sensors, network streams), [`RealtimePipeline`] wraps
20//! any [`StreamingPipeline`] with a bounded input queue and a background worker
21//! thread.  When the queue is full the producer is automatically throttled
22//! (backpressure); [`RealtimePipeline::try_send`] provides a non-blocking
23//! variant that drops items instead of blocking.
24//!
25//! # Provided pipelines
26//!
27//! | Type | Description |
28//! |---|---|
29//! | [`StreamingVoxelFilter`] | Downsamples via a voxel grid; O(voxels) memory |
30//! | [`StreamingStatistics`] | Accumulates bounding-box and point count |
31//! | [`StreamingCollector`] | Collects all points (useful for testing) |
32//! | [`RealtimePipeline`] | Drives any pipeline from a background thread with bounded-queue backpressure |
33//!
34//! # Example
35//!
36//! ```rust
37//! use threecrate_algorithms::streaming::{
38//!     StreamingPipeline, StreamingVoxelFilter, StreamingVoxelFilterConfig, run_pipeline,
39//! };
40//! use threecrate_core::Point3f;
41//!
42//! let points: Vec<Result<Point3f, _>> = vec![
43//!     Ok(Point3f::new(0.0, 0.0, 0.0)),
44//!     Ok(Point3f::new(0.05, 0.0, 0.0)),
45//!     Ok(Point3f::new(1.0, 1.0, 1.0)),
46//! ];
47//! let mut filter = StreamingVoxelFilter::new(StreamingVoxelFilterConfig { voxel_size: 0.1 });
48//! let stats = run_pipeline(&mut filter, points.into_iter(), 2).unwrap();
49//! let cloud = filter.finalize().unwrap();
50//! println!("Downsampled to {} points", cloud.len());
51//! ```
52
53use std::collections::HashMap;
54use std::sync::Arc;
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::sync::mpsc::{sync_channel, SyncSender, TrySendError};
57use std::thread::{self, JoinHandle};
58use std::time::Duration;
59use threecrate_core::{Error, Point3f, PointCloud, Result};
60
61// ---------------------------------------------------------------------------
62// Core trait
63// ---------------------------------------------------------------------------
64
65/// Trait for chunk-based streaming processors.
66///
67/// Implementations accumulate state across calls to [`process_chunk`] and
68/// produce a final result via [`finalize`].  The chunk size controls peak RAM
69/// usage: smaller chunks use less memory at the cost of more function-call
70/// overhead.
71///
72/// [`process_chunk`]: StreamingPipeline::process_chunk
73/// [`finalize`]: StreamingPipeline::finalize
74pub trait StreamingPipeline<T> {
75    /// The type produced after all chunks have been processed.
76    type Output;
77
78    /// Ingest one chunk of items.  Called repeatedly until the source is
79    /// exhausted.  `chunk` will never be empty.
80    fn process_chunk(&mut self, chunk: &[T]) -> Result<()>;
81
82    /// Consume the pipeline and return the accumulated output.
83    fn finalize(self) -> Result<Self::Output>;
84
85    /// Estimated number of bytes currently held by this pipeline stage.
86    /// Default returns `0`; override to expose real memory usage.
87    fn memory_bytes(&self) -> usize { 0 }
88}
89
90// ---------------------------------------------------------------------------
91// Pipeline runner
92// ---------------------------------------------------------------------------
93
94/// Statistics reported by [`run_pipeline`].
95#[derive(Debug, Clone, Default)]
96pub struct RunStats {
97    /// Total number of successfully processed items.
98    pub items_processed: usize,
99    /// Number of chunks delivered to the pipeline.
100    pub chunks_processed: usize,
101    /// Number of items skipped due to errors (if `skip_errors` is set).
102    pub errors_skipped: usize,
103}
104
105/// Options for [`run_pipeline`].
106#[derive(Debug, Clone)]
107pub struct RunOptions {
108    /// If `true`, item-level errors from the source iterator are counted and
109    /// skipped rather than causing [`run_pipeline`] to return early.
110    /// Default: `false`.
111    pub skip_errors: bool,
112}
113
114impl Default for RunOptions {
115    fn default() -> Self { Self { skip_errors: false } }
116}
117
118/// Drive `pipeline` by reading from `source` in chunks of `chunk_size` items.
119///
120/// Returns [`RunStats`] on success.  The iterator's item errors are propagated
121/// unless [`RunOptions::skip_errors`] is set.
122///
123/// # Arguments
124/// * `pipeline`   – A mutable reference to a [`StreamingPipeline`].
125/// * `source`     – Any iterator whose items are `Result<T>`.
126/// * `chunk_size` – Number of items to accumulate before calling
127///                  [`StreamingPipeline::process_chunk`].  Must be ≥ 1.
128pub fn run_pipeline<T, P>(
129    pipeline: &mut P,
130    source: impl Iterator<Item = Result<T>>,
131    chunk_size: usize,
132) -> Result<RunStats>
133where
134    P: StreamingPipeline<T>,
135{
136    run_pipeline_with_options(pipeline, source, chunk_size, &RunOptions::default())
137}
138
139/// Like [`run_pipeline`] but accepts explicit options.
140pub fn run_pipeline_with_options<T, P>(
141    pipeline: &mut P,
142    source: impl Iterator<Item = Result<T>>,
143    chunk_size: usize,
144    opts: &RunOptions,
145) -> Result<RunStats>
146where
147    P: StreamingPipeline<T>,
148{
149    if chunk_size == 0 {
150        return Err(Error::InvalidData("chunk_size must be ≥ 1".into()));
151    }
152
153    let mut stats = RunStats::default();
154    let mut chunk: Vec<T> = Vec::with_capacity(chunk_size);
155
156    for item in source {
157        match item {
158            Ok(point) => {
159                chunk.push(point);
160                if chunk.len() == chunk_size {
161                    pipeline.process_chunk(&chunk)?;
162                    stats.items_processed += chunk.len();
163                    stats.chunks_processed += 1;
164                    chunk.clear();
165                }
166            }
167            Err(e) => {
168                if opts.skip_errors {
169                    stats.errors_skipped += 1;
170                } else {
171                    return Err(e);
172                }
173            }
174        }
175    }
176
177    // Flush any remaining items.
178    if !chunk.is_empty() {
179        pipeline.process_chunk(&chunk)?;
180        stats.items_processed += chunk.len();
181        stats.chunks_processed += 1;
182    }
183
184    Ok(stats)
185}
186
187// ---------------------------------------------------------------------------
188// StreamingVoxelFilter
189// ---------------------------------------------------------------------------
190
191/// Configuration for [`StreamingVoxelFilter`].
192#[derive(Debug, Clone)]
193pub struct StreamingVoxelFilterConfig {
194    /// Side length of each cubic voxel (same units as the point coordinates).
195    /// Must be positive.
196    pub voxel_size: f32,
197}
198
199/// Streaming voxel-grid downsampler.
200///
201/// Maintains a [`HashMap`] from voxel coordinates to a representative point.
202/// Peak memory is `O(V)` where `V` is the number of occupied voxels in the
203/// entire dataset — typically far smaller than N points.
204///
205/// Unlike the in-memory [`voxel_grid_filter`](crate::filtering::voxel_grid_filter),
206/// no bounding-box pre-scan is required; voxel keys are derived by dividing
207/// each coordinate by `voxel_size` and rounding toward negative infinity, so
208/// they are consistent across all chunks.
209///
210/// The representative point for each voxel is the **centroid** of all points
211/// assigned to that voxel, giving a smoother result than first-point selection.
212pub struct StreamingVoxelFilter {
213    config: StreamingVoxelFilterConfig,
214    /// Accumulated sum and count for centroid computation.
215    voxels: HashMap<(i32, i32, i32), ([f64; 3], u32)>,
216}
217
218impl StreamingVoxelFilter {
219    /// Create a new streaming voxel filter.
220    pub fn new(config: StreamingVoxelFilterConfig) -> Self {
221        Self { config, voxels: HashMap::new() }
222    }
223
224    #[inline]
225    fn voxel_key(&self, p: &Point3f) -> (i32, i32, i32) {
226        let inv = 1.0 / self.config.voxel_size;
227        (
228            (p.x * inv).floor() as i32,
229            (p.y * inv).floor() as i32,
230            (p.z * inv).floor() as i32,
231        )
232    }
233
234    /// Number of occupied voxels accumulated so far.
235    pub fn voxel_count(&self) -> usize { self.voxels.len() }
236}
237
238impl StreamingPipeline<Point3f> for StreamingVoxelFilter {
239    type Output = PointCloud<Point3f>;
240
241    fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
242        if self.config.voxel_size <= 0.0 {
243            return Err(Error::InvalidData("voxel_size must be positive".into()));
244        }
245        for p in chunk {
246            let key = self.voxel_key(p);
247            let entry = self.voxels.entry(key).or_insert(([0.0; 3], 0));
248            entry.0[0] += p.x as f64;
249            entry.0[1] += p.y as f64;
250            entry.0[2] += p.z as f64;
251            entry.1 += 1;
252        }
253        Ok(())
254    }
255
256    fn finalize(self) -> Result<PointCloud<Point3f>> {
257        let points: Vec<Point3f> = self
258            .voxels
259            .values()
260            .map(|(sum, count)| {
261                let n = *count as f64;
262                Point3f::new((sum[0] / n) as f32, (sum[1] / n) as f32, (sum[2] / n) as f32)
263            })
264            .collect();
265        Ok(PointCloud::from_points(points))
266    }
267
268    fn memory_bytes(&self) -> usize {
269        // Each entry: key (12 bytes) + value (28 bytes) + HashMap overhead (~50 bytes).
270        self.voxels.len() * 90
271    }
272}
273
274// ---------------------------------------------------------------------------
275// StreamingStatistics
276// ---------------------------------------------------------------------------
277
278/// Accumulated statistics produced by [`StreamingStatistics`].
279#[derive(Debug, Clone)]
280pub struct PointCloudStats {
281    /// Total number of points processed.
282    pub point_count: u64,
283    /// Minimum coordinate (axis-aligned bounding box corner).
284    pub min: Point3f,
285    /// Maximum coordinate (axis-aligned bounding box corner).
286    pub max: Point3f,
287    /// Per-axis mean coordinates.
288    pub mean: Point3f,
289}
290
291/// Streaming statistics collector.
292///
293/// Computes bounding box, point count, and mean position in a single pass
294/// without retaining any individual points.  Peak memory is `O(1)`.
295pub struct StreamingStatistics {
296    count: u64,
297    min: [f32; 3],
298    max: [f32; 3],
299    sum: [f64; 3],
300}
301
302impl StreamingStatistics {
303    /// Create a new statistics collector.
304    pub fn new() -> Self {
305        Self {
306            count: 0,
307            min: [f32::INFINITY; 3],
308            max: [f32::NEG_INFINITY; 3],
309            sum: [0.0; 3],
310        }
311    }
312}
313
314impl Default for StreamingStatistics {
315    fn default() -> Self { Self::new() }
316}
317
318impl StreamingPipeline<Point3f> for StreamingStatistics {
319    type Output = PointCloudStats;
320
321    fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
322        for p in chunk {
323            self.count += 1;
324            self.min[0] = self.min[0].min(p.x);
325            self.min[1] = self.min[1].min(p.y);
326            self.min[2] = self.min[2].min(p.z);
327            self.max[0] = self.max[0].max(p.x);
328            self.max[1] = self.max[1].max(p.y);
329            self.max[2] = self.max[2].max(p.z);
330            self.sum[0] += p.x as f64;
331            self.sum[1] += p.y as f64;
332            self.sum[2] += p.z as f64;
333        }
334        Ok(())
335    }
336
337    fn finalize(self) -> Result<PointCloudStats> {
338        if self.count == 0 {
339            return Err(Error::InvalidData("no points were processed".into()));
340        }
341        let n = self.count as f64;
342        Ok(PointCloudStats {
343            point_count: self.count,
344            min: Point3f::new(self.min[0], self.min[1], self.min[2]),
345            max: Point3f::new(self.max[0], self.max[1], self.max[2]),
346            mean: Point3f::new(
347                (self.sum[0] / n) as f32,
348                (self.sum[1] / n) as f32,
349                (self.sum[2] / n) as f32,
350            ),
351        })
352    }
353
354    fn memory_bytes(&self) -> usize { std::mem::size_of::<Self>() }
355}
356
357// ---------------------------------------------------------------------------
358// StreamingCollector
359// ---------------------------------------------------------------------------
360
361/// Streaming pipeline stage that collects all points into a `PointCloud`.
362///
363/// Useful for testing or as a terminal stage when the full cloud must
364/// eventually be materialized (e.g. after prior stages have filtered it down).
365pub struct StreamingCollector {
366    points: Vec<Point3f>,
367}
368
369impl StreamingCollector {
370    /// Create a new collector.
371    pub fn new() -> Self { Self { points: Vec::new() } }
372
373    /// Create a collector with pre-allocated capacity.
374    pub fn with_capacity(cap: usize) -> Self {
375        Self { points: Vec::with_capacity(cap) }
376    }
377}
378
379impl Default for StreamingCollector {
380    fn default() -> Self { Self::new() }
381}
382
383impl StreamingPipeline<Point3f> for StreamingCollector {
384    type Output = PointCloud<Point3f>;
385
386    fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
387        self.points.extend_from_slice(chunk);
388        Ok(())
389    }
390
391    fn finalize(self) -> Result<PointCloud<Point3f>> {
392        Ok(PointCloud::from_points(self.points))
393    }
394
395    fn memory_bytes(&self) -> usize {
396        self.points.len() * std::mem::size_of::<Point3f>()
397    }
398}
399
400// ---------------------------------------------------------------------------
401// Streaming source helpers
402// ---------------------------------------------------------------------------
403
404/// Wrap a `PointCloud` as a streaming source of `Result<Point3f>`.
405///
406/// Useful for testing pipelines without a file on disk.
407pub fn cloud_as_stream(
408    cloud: &PointCloud<Point3f>,
409) -> impl Iterator<Item = Result<Point3f>> + '_ {
410    cloud.points.iter().copied().map(Ok)
411}
412
413// ---------------------------------------------------------------------------
414// Real-time streaming pipeline with backpressure
415// ---------------------------------------------------------------------------
416
417/// Configuration for [`RealtimePipeline`].
418#[derive(Debug, Clone)]
419pub struct BackpressureConfig {
420    /// Maximum number of items buffered in the pipeline queue before backpressure
421    /// kicks in.  [`RealtimePipeline::send`] blocks; [`RealtimePipeline::try_send`]
422    /// drops the item and increments [`RealtimeMetrics::items_dropped`].
423    pub max_queue_depth: usize,
424    /// Number of items accumulated before calling [`StreamingPipeline::process_chunk`].
425    pub chunk_size: usize,
426    /// How long the worker waits for the next item before flushing a partial chunk.
427    ///
428    /// `Some(d)` bounds end-to-end latency at the cost of occasional small chunk
429    /// calls.  `None` flushes only when a full chunk is available or the input
430    /// is closed, which is more efficient for bulk/batch workloads.
431    pub flush_timeout: Option<Duration>,
432}
433
434impl Default for BackpressureConfig {
435    fn default() -> Self {
436        Self {
437            max_queue_depth: 1024,
438            chunk_size: 256,
439            flush_timeout: Some(Duration::from_millis(10)),
440        }
441    }
442}
443
444/// Real-time metrics snapshot from a [`RealtimePipeline`].
445#[derive(Debug, Clone, Default)]
446pub struct RealtimeMetrics {
447    /// Total items successfully placed in the queue (accepted by `send` or `try_send`).
448    pub items_queued: u64,
449    /// Total items dequeued and processed by the background worker.
450    pub items_processed: u64,
451    /// Items dropped because the queue was full (only incremented by `try_send`).
452    pub items_dropped: u64,
453    /// Estimated current queue depth (`items_queued − items_processed`).
454    pub estimated_queue_depth: u64,
455}
456
457struct SharedMetrics {
458    items_queued: AtomicU64,
459    items_processed: AtomicU64,
460    items_dropped: AtomicU64,
461}
462
463impl SharedMetrics {
464    fn new() -> Arc<Self> {
465        Arc::new(Self {
466            items_queued: AtomicU64::new(0),
467            items_processed: AtomicU64::new(0),
468            items_dropped: AtomicU64::new(0),
469        })
470    }
471
472    fn snapshot(&self) -> RealtimeMetrics {
473        let queued = self.items_queued.load(Ordering::Relaxed);
474        let processed = self.items_processed.load(Ordering::Relaxed);
475        let dropped = self.items_dropped.load(Ordering::Relaxed);
476        RealtimeMetrics {
477            items_queued: queued,
478            items_processed: processed,
479            items_dropped: dropped,
480            estimated_queue_depth: queued.saturating_sub(processed),
481        }
482    }
483}
484
485/// Real-time streaming pipeline with backpressure.
486///
487/// Wraps any [`StreamingPipeline`] with a bounded input queue and drives it
488/// from a background worker thread.  Flow control works as follows:
489///
490/// - **[`send`]** blocks the caller when the queue is full — the producer is
491///   naturally throttled without any explicit rate-limiter code.
492/// - **[`try_send`]** never blocks; it drops the item and increments
493///   [`RealtimeMetrics::items_dropped`] when the queue is full.
494/// - Call **[`finish`]** to close the input, drain remaining items, join the
495///   worker, and retrieve the pipeline's final output.
496/// - Dropping the pipeline without calling `finish` is safe: the channel is
497///   closed and the worker is joined in the `Drop` impl (result discarded).
498///
499/// [`send`]: RealtimePipeline::send
500/// [`try_send`]: RealtimePipeline::try_send
501/// [`finish`]: RealtimePipeline::finish
502///
503/// # Example
504///
505/// ```rust
506/// use threecrate_algorithms::streaming::{
507///     StreamingCollector, BackpressureConfig, RealtimePipeline,
508/// };
509/// use threecrate_core::Point3f;
510///
511/// let config = BackpressureConfig { max_queue_depth: 64, chunk_size: 16, ..Default::default() };
512/// let rt = RealtimePipeline::new(StreamingCollector::new(), config);
513/// for i in 0..50_u32 {
514///     rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
515/// }
516/// let cloud = rt.finish().unwrap();
517/// assert_eq!(cloud.len(), 50);
518/// ```
519pub struct RealtimePipeline<T: Send + 'static, O: Send + 'static> {
520    sender: Option<SyncSender<T>>,
521    metrics: Arc<SharedMetrics>,
522    join_handle: Option<JoinHandle<Result<O>>>,
523}
524
525impl<T: Send + 'static, O: Send + 'static> RealtimePipeline<T, O> {
526    /// Create a new real-time pipeline backed by `pipeline`.
527    ///
528    /// The worker thread starts immediately and is ready to receive items.
529    pub fn new<P>(pipeline: P, config: BackpressureConfig) -> Self
530    where
531        P: StreamingPipeline<T, Output = O> + Send + 'static,
532    {
533        assert!(config.chunk_size >= 1, "chunk_size must be ≥ 1");
534        assert!(config.max_queue_depth >= 1, "max_queue_depth must be ≥ 1");
535
536        let (sender, receiver) = sync_channel::<T>(config.max_queue_depth);
537        let metrics = SharedMetrics::new();
538        let metrics_worker = Arc::clone(&metrics);
539        let chunk_size = config.chunk_size;
540        let flush_timeout = config.flush_timeout;
541
542        let join_handle = thread::spawn(move || {
543            realtime_worker(receiver, pipeline, chunk_size, flush_timeout, metrics_worker)
544        });
545
546        Self { sender: Some(sender), metrics, join_handle: Some(join_handle) }
547    }
548
549    /// Send an item, blocking until queue space is available (backpressure).
550    ///
551    /// Returns `Err` if the worker thread has unexpectedly terminated.
552    pub fn send(&self, item: T) -> Result<()> {
553        let sender = self
554            .sender
555            .as_ref()
556            .ok_or_else(|| Error::InvalidData("pipeline already finished".into()))?;
557        sender
558            .send(item)
559            .map_err(|_| Error::InvalidData("pipeline worker has terminated".into()))?;
560        self.metrics.items_queued.fetch_add(1, Ordering::Relaxed);
561        Ok(())
562    }
563
564    /// Try to send without blocking.
565    ///
566    /// Returns `Ok(true)` when the item was queued, `Ok(false)` when the queue
567    /// was full and the item was dropped (counted in
568    /// [`RealtimeMetrics::items_dropped`]).  Returns `Err` if the worker has
569    /// terminated unexpectedly.
570    pub fn try_send(&self, item: T) -> Result<bool> {
571        let sender = self
572            .sender
573            .as_ref()
574            .ok_or_else(|| Error::InvalidData("pipeline already finished".into()))?;
575        match sender.try_send(item) {
576            Ok(()) => {
577                self.metrics.items_queued.fetch_add(1, Ordering::Relaxed);
578                Ok(true)
579            }
580            Err(TrySendError::Full(_)) => {
581                self.metrics.items_dropped.fetch_add(1, Ordering::Relaxed);
582                Ok(false)
583            }
584            Err(TrySendError::Disconnected(_)) => {
585                Err(Error::InvalidData("pipeline worker has terminated".into()))
586            }
587        }
588    }
589
590    /// Snapshot current pipeline metrics.
591    pub fn metrics(&self) -> RealtimeMetrics {
592        self.metrics.snapshot()
593    }
594
595    /// Close the input queue, wait for the worker to drain all buffered items,
596    /// and return the pipeline's final output.
597    pub fn finish(mut self) -> Result<O> {
598        self.sender = None;
599        self.join_handle
600            .take()
601            .expect("pipeline already finished")
602            .join()
603            .map_err(|_| Error::InvalidData("pipeline worker panicked".into()))?
604    }
605}
606
607impl<T: Send + 'static, O: Send + 'static> Drop for RealtimePipeline<T, O> {
608    fn drop(&mut self) {
609        self.sender = None;
610        if let Some(handle) = self.join_handle.take() {
611            let _ = handle.join();
612        }
613    }
614}
615
616fn realtime_worker<T, P>(
617    receiver: std::sync::mpsc::Receiver<T>,
618    mut pipeline: P,
619    chunk_size: usize,
620    flush_timeout: Option<Duration>,
621    metrics: Arc<SharedMetrics>,
622) -> Result<P::Output>
623where
624    P: StreamingPipeline<T>,
625{
626    let mut chunk: Vec<T> = Vec::with_capacity(chunk_size);
627
628    match flush_timeout {
629        None => {
630            for item in receiver {
631                metrics.items_processed.fetch_add(1, Ordering::Relaxed);
632                chunk.push(item);
633                if chunk.len() >= chunk_size {
634                    pipeline.process_chunk(&chunk)?;
635                    chunk.clear();
636                }
637            }
638        }
639        Some(timeout) => {
640            use std::sync::mpsc::RecvTimeoutError;
641            loop {
642                match receiver.recv_timeout(timeout) {
643                    Ok(item) => {
644                        metrics.items_processed.fetch_add(1, Ordering::Relaxed);
645                        chunk.push(item);
646                        if chunk.len() >= chunk_size {
647                            pipeline.process_chunk(&chunk)?;
648                            chunk.clear();
649                        }
650                    }
651                    Err(RecvTimeoutError::Timeout) => {
652                        if !chunk.is_empty() {
653                            pipeline.process_chunk(&chunk)?;
654                            chunk.clear();
655                        }
656                    }
657                    Err(RecvTimeoutError::Disconnected) => break,
658                }
659            }
660        }
661    }
662
663    if !chunk.is_empty() {
664        pipeline.process_chunk(&chunk)?;
665    }
666
667    pipeline.finalize()
668}
669
670// ---------------------------------------------------------------------------
671// Tests
672// ---------------------------------------------------------------------------
673
674#[cfg(test)]
675mod tests {
676    use super::*;
677
678    fn grid_cloud(n: usize) -> PointCloud<Point3f> {
679        let pts: Vec<Point3f> = (0..n)
680            .map(|i| Point3f::new(i as f32 * 0.1, 0.0, 0.0))
681            .collect();
682        PointCloud::from_points(pts)
683    }
684
685    // ---- StreamingCollector -----------------------------------------------
686
687    #[test]
688    fn test_collector_round_trip() {
689        let cloud = grid_cloud(25);
690        let mut collector = StreamingCollector::new();
691        let stats = run_pipeline(&mut collector, cloud_as_stream(&cloud), 8).unwrap();
692        let out = collector.finalize().unwrap();
693
694        assert_eq!(stats.items_processed, 25);
695        assert_eq!(stats.chunks_processed, 4); // 8+8+8+1
696        assert_eq!(out.len(), 25);
697    }
698
699    // ---- StreamingStatistics ----------------------------------------------
700
701    #[test]
702    fn test_statistics_correctness() {
703        // Three points: (0,0,0), (1,0,0), (2,0,0)
704        let cloud = PointCloud::from_points(vec![
705            Point3f::new(0.0, 0.0, 0.0),
706            Point3f::new(1.0, 0.0, 0.0),
707            Point3f::new(2.0, 0.0, 0.0),
708        ]);
709        let mut stats_pipe = StreamingStatistics::new();
710        run_pipeline(&mut stats_pipe, cloud_as_stream(&cloud), 2).unwrap();
711        let s = stats_pipe.finalize().unwrap();
712
713        assert_eq!(s.point_count, 3);
714        assert!((s.min.x - 0.0).abs() < 1e-6);
715        assert!((s.max.x - 2.0).abs() < 1e-6);
716        assert!((s.mean.x - 1.0).abs() < 1e-6);
717    }
718
719    #[test]
720    fn test_statistics_empty_fails() {
721        let mut stats_pipe = StreamingStatistics::new();
722        // No chunks processed — finalize should error.
723        assert!(stats_pipe.finalize().is_err());
724    }
725
726    // ---- StreamingVoxelFilter ---------------------------------------------
727
728    #[test]
729    fn test_voxel_filter_reduces_density() {
730        // 100 points at x=0..9.9 in steps of 0.1 → should collapse to ~10 voxels
731        // with voxel_size=1.0.
732        let cloud = grid_cloud(100);
733        let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
734        let mut filter = StreamingVoxelFilter::new(config);
735        run_pipeline(&mut filter, cloud_as_stream(&cloud), 32).unwrap();
736        let out = filter.finalize().unwrap();
737
738        assert!(out.len() <= 10, "expected ≤10 voxels, got {}", out.len());
739        assert!(!out.is_empty());
740    }
741
742    #[test]
743    fn test_voxel_filter_centroid() {
744        // Two points in the same voxel → centroid should be their midpoint.
745        let cloud = PointCloud::from_points(vec![
746            Point3f::new(0.1, 0.0, 0.0),
747            Point3f::new(0.3, 0.0, 0.0),
748        ]);
749        let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
750        let mut filter = StreamingVoxelFilter::new(config);
751        run_pipeline(&mut filter, cloud_as_stream(&cloud), 10).unwrap();
752        let out = filter.finalize().unwrap();
753
754        assert_eq!(out.len(), 1);
755        assert!((out.points[0].x - 0.2).abs() < 1e-5);
756    }
757
758    #[test]
759    fn test_voxel_filter_across_chunk_boundary() {
760        // The two points that belong to the same voxel are split across chunks.
761        // The filter must still merge them.
762        let cloud = PointCloud::from_points(vec![
763            Point3f::new(0.1, 0.0, 0.0),
764            Point3f::new(0.9, 0.0, 0.0),
765        ]);
766        let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
767        let mut filter = StreamingVoxelFilter::new(config);
768        // chunk_size=1 forces each point into its own chunk.
769        run_pipeline(&mut filter, cloud_as_stream(&cloud), 1).unwrap();
770        let out = filter.finalize().unwrap();
771
772        assert_eq!(out.len(), 1, "points in the same voxel across chunks should merge");
773        assert!((out.points[0].x - 0.5).abs() < 1e-5);
774    }
775
776    #[test]
777    fn test_invalid_voxel_size() {
778        let config = StreamingVoxelFilterConfig { voxel_size: -1.0 };
779        let mut filter = StreamingVoxelFilter::new(config);
780        let cloud = PointCloud::from_points(vec![Point3f::new(0.0, 0.0, 0.0)]);
781        let result = run_pipeline(&mut filter, cloud_as_stream(&cloud), 1);
782        assert!(result.is_err());
783    }
784
785    // ---- run_pipeline options --------------------------------------------
786
787    #[test]
788    fn test_skip_errors() {
789        let source: Vec<Result<Point3f>> = vec![
790            Ok(Point3f::new(0.0, 0.0, 0.0)),
791            Err(Error::InvalidData("bad point".into())),
792            Ok(Point3f::new(1.0, 0.0, 0.0)),
793        ];
794        let mut collector = StreamingCollector::new();
795        let run_stats = run_pipeline_with_options(
796            &mut collector,
797            source.into_iter(),
798            10,
799            &RunOptions { skip_errors: true },
800        )
801        .unwrap();
802        let out = collector.finalize().unwrap();
803        assert_eq!(out.len(), 2);
804        assert_eq!(run_stats.errors_skipped, 1);
805    }
806
807    #[test]
808    fn test_error_propagation() {
809        let source: Vec<Result<Point3f>> = vec![
810            Ok(Point3f::new(0.0, 0.0, 0.0)),
811            Err(Error::InvalidData("bad point".into())),
812        ];
813        let mut collector = StreamingCollector::new();
814        // Default options: errors propagate.
815        assert!(run_pipeline(&mut collector, source.into_iter(), 10).is_err());
816    }
817
818    #[test]
819    fn test_chunk_size_zero_fails() {
820        let mut collector = StreamingCollector::new();
821        let result = run_pipeline(
822            &mut collector,
823            std::iter::empty::<Result<Point3f>>(),
824            0,
825        );
826        assert!(result.is_err());
827    }
828
829    #[test]
830    fn test_memory_bytes() {
831        let mut filter = StreamingVoxelFilter::new(StreamingVoxelFilterConfig { voxel_size: 0.5 });
832        let cloud = grid_cloud(20);
833        run_pipeline(&mut filter, cloud_as_stream(&cloud), 5).unwrap();
834        // At least some voxels should be occupied and memory should be > 0.
835        assert!(filter.memory_bytes() > 0);
836    }
837
838    // ---- RealtimePipeline ---------------------------------------------------
839
840    #[test]
841    fn test_realtime_basic_round_trip() {
842        let config = BackpressureConfig { max_queue_depth: 32, chunk_size: 8, flush_timeout: None };
843        let rt = RealtimePipeline::new(StreamingCollector::new(), config);
844        for i in 0..50_u32 {
845            rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
846        }
847        let cloud = rt.finish().unwrap();
848        assert_eq!(cloud.len(), 50);
849    }
850
851    #[test]
852    fn test_realtime_with_flush_timeout() {
853        let config = BackpressureConfig {
854            max_queue_depth: 64,
855            chunk_size: 100,
856            flush_timeout: Some(Duration::from_millis(5)),
857        };
858        let rt = RealtimePipeline::new(StreamingCollector::new(), config);
859        for i in 0..20_u32 {
860            rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
861        }
862        // finish() waits for worker; flush_timeout ensures partial chunk is processed.
863        let cloud = rt.finish().unwrap();
864        assert_eq!(cloud.len(), 20);
865    }
866
867    #[test]
868    fn test_realtime_voxel_filter() {
869        let filter = StreamingVoxelFilter::new(StreamingVoxelFilterConfig { voxel_size: 1.0 });
870        let config = BackpressureConfig { max_queue_depth: 64, chunk_size: 16, flush_timeout: None };
871        let rt = RealtimePipeline::new(filter, config);
872        for i in 0..100_u32 {
873            rt.send(Point3f::new(i as f32 * 0.1, 0.0, 0.0)).unwrap();
874        }
875        let cloud = rt.finish().unwrap();
876        assert!(cloud.len() <= 10, "expected ≤10 voxels, got {}", cloud.len());
877        assert!(!cloud.is_empty());
878    }
879
880    #[test]
881    fn test_realtime_metrics_queued_count() {
882        let config = BackpressureConfig { max_queue_depth: 64, chunk_size: 32, flush_timeout: None };
883        let rt = RealtimePipeline::new(StreamingCollector::new(), config);
884        for i in 0..20_u32 {
885            rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
886        }
887        let m = rt.metrics();
888        assert_eq!(m.items_queued, 20);
889        assert_eq!(m.items_dropped, 0);
890        rt.finish().unwrap();
891    }
892
893    #[test]
894    fn test_realtime_try_send_accepts_when_space() {
895        let config = BackpressureConfig { max_queue_depth: 16, chunk_size: 8, flush_timeout: None };
896        let rt = RealtimePipeline::new(StreamingCollector::new(), config);
897        let accepted = rt.try_send(Point3f::new(1.0, 0.0, 0.0)).unwrap();
898        assert!(accepted, "should accept item when queue has space");
899        let m = rt.metrics();
900        assert_eq!(m.items_queued, 1);
901        assert_eq!(m.items_dropped, 0);
902        let cloud = rt.finish().unwrap();
903        assert_eq!(cloud.len(), 1);
904    }
905
906    #[test]
907    fn test_realtime_try_send_drops_when_full() {
908        use std::sync::{Condvar, Mutex};
909
910        // A pipeline whose first process_chunk blocks until we release a latch,
911        // so we can fill the bounded channel before the worker drains it.
912        struct LatchedCollector {
913            latch: Arc<(Mutex<bool>, Condvar)>,
914            inner: StreamingCollector,
915            blocked: bool,
916        }
917        impl StreamingPipeline<Point3f> for LatchedCollector {
918            type Output = PointCloud<Point3f>;
919            fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
920                if !self.blocked {
921                    self.blocked = true;
922                    let (lock, cv) = &*self.latch;
923                    let mut released = lock.lock().unwrap();
924                    while !*released {
925                        released = cv.wait(released).unwrap();
926                    }
927                }
928                self.inner.process_chunk(chunk)
929            }
930            fn finalize(self) -> Result<PointCloud<Point3f>> {
931                self.inner.finalize()
932            }
933        }
934
935        let latch = Arc::new((Mutex::new(false), Condvar::new()));
936        let latch_release = Arc::clone(&latch);
937
938        // chunk_size=1: every item triggers process_chunk → first one blocks.
939        let config = BackpressureConfig { max_queue_depth: 1, chunk_size: 1, flush_timeout: None };
940        let rt = RealtimePipeline::new(
941            LatchedCollector { latch, inner: StreamingCollector::new(), blocked: false },
942            config,
943        );
944
945        // First item: worker picks it up and blocks inside process_chunk.
946        rt.send(Point3f::new(0.0, 0.0, 0.0)).unwrap();
947        // Give worker time to dequeue and enter process_chunk.
948        std::thread::sleep(Duration::from_millis(20));
949
950        // Queue depth is 1; worker is blocked → channel is full.
951        let mut accepted = 0usize;
952        let mut dropped = 0usize;
953        for i in 1..=8_u32 {
954            if rt.try_send(Point3f::new(i as f32, 0.0, 0.0)).unwrap() {
955                accepted += 1;
956            } else {
957                dropped += 1;
958            }
959        }
960        assert!(dropped > 0, "expected at least one drop with max_queue_depth=1");
961
962        // Release the latch so the worker can finish processing.
963        let (lock, cv) = &*latch_release;
964        *lock.lock().unwrap() = true;
965        cv.notify_all();
966
967        // Capture drop count before consuming rt.
968        let total_dropped = rt.metrics().items_dropped;
969        let cloud = rt.finish().unwrap();
970        // Total accepted = 1 (from send) + accepted (from try_send).
971        assert_eq!(cloud.len(), 1 + accepted);
972        assert_eq!(total_dropped, dropped as u64);
973    }
974
975    #[test]
976    fn test_realtime_drop_without_finish() {
977        let config = BackpressureConfig { max_queue_depth: 16, chunk_size: 4, flush_timeout: None };
978        let rt = RealtimePipeline::new(StreamingCollector::new(), config);
979        for i in 0..10_u32 {
980            rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
981        }
982        drop(rt); // Must not panic or deadlock.
983    }
984
985    #[test]
986    fn test_realtime_large_workload() {
987        let config = BackpressureConfig {
988            max_queue_depth: 512,
989            chunk_size: 128,
990            flush_timeout: None,
991        };
992        let rt = RealtimePipeline::new(StreamingCollector::new(), config);
993        const N: u32 = 10_000;
994        for i in 0..N {
995            rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
996        }
997        let cloud = rt.finish().unwrap();
998        assert_eq!(cloud.len(), N as usize);
999    }
1000}