Skip to main content

threecrate_algorithms/
streaming.rs

1//! Streaming point cloud processing pipeline.
2//!
3//! Enables out-of-core processing of arbitrarily large point clouds by reading
4//! and processing data in bounded-size chunks.  Only one chunk resides in RAM at
5//! a time; the pipeline accumulates lightweight per-chunk state (e.g. a voxel map)
6//! that is orders of magnitude smaller than the full dataset.
7//!
8//! # Architecture
9//!
10//! ```text
11//! Source iterator                Pipeline stage           Output
12//! (file / network / …)  ──►  process_chunk(&[T])  ──►  finalize()
13//!      chunk 0                  accumulate state
14//!      chunk 1                  accumulate state
15//!      …                        …
16//!      chunk N                  accumulate state
17//! ```
18//!
19//! # Provided pipelines
20//!
21//! | Type | Description |
22//! |---|---|
23//! | [`StreamingVoxelFilter`] | Downsamples via a voxel grid; O(voxels) memory |
24//! | [`StreamingStatistics`] | Accumulates bounding-box and point count |
25//! | [`StreamingCollector`] | Collects all points (useful for testing) |
26//!
27//! # Example
28//!
29//! ```rust
30//! use threecrate_algorithms::streaming::{
31//!     StreamingPipeline, StreamingVoxelFilter, StreamingVoxelFilterConfig, run_pipeline,
32//! };
33//! use threecrate_core::Point3f;
34//!
35//! let points: Vec<Result<Point3f, _>> = vec![
36//!     Ok(Point3f::new(0.0, 0.0, 0.0)),
37//!     Ok(Point3f::new(0.05, 0.0, 0.0)),
38//!     Ok(Point3f::new(1.0, 1.0, 1.0)),
39//! ];
40//! let mut filter = StreamingVoxelFilter::new(StreamingVoxelFilterConfig { voxel_size: 0.1 });
41//! let stats = run_pipeline(&mut filter, points.into_iter(), 2).unwrap();
42//! let cloud = filter.finalize().unwrap();
43//! println!("Downsampled to {} points", cloud.len());
44//! ```
45
46use std::collections::HashMap;
47use threecrate_core::{Error, Point3f, PointCloud, Result};
48
49// ---------------------------------------------------------------------------
50// Core trait
51// ---------------------------------------------------------------------------
52
53/// Trait for chunk-based streaming processors.
54///
55/// Implementations accumulate state across calls to [`process_chunk`] and
56/// produce a final result via [`finalize`].  The chunk size controls peak RAM
57/// usage: smaller chunks use less memory at the cost of more function-call
58/// overhead.
59///
60/// [`process_chunk`]: StreamingPipeline::process_chunk
61/// [`finalize`]: StreamingPipeline::finalize
62pub trait StreamingPipeline<T> {
63    /// The type produced after all chunks have been processed.
64    type Output;
65
66    /// Ingest one chunk of items.  Called repeatedly until the source is
67    /// exhausted.  `chunk` will never be empty.
68    fn process_chunk(&mut self, chunk: &[T]) -> Result<()>;
69
70    /// Consume the pipeline and return the accumulated output.
71    fn finalize(self) -> Result<Self::Output>;
72
73    /// Estimated number of bytes currently held by this pipeline stage.
74    /// Default returns `0`; override to expose real memory usage.
75    fn memory_bytes(&self) -> usize { 0 }
76}
77
78// ---------------------------------------------------------------------------
79// Pipeline runner
80// ---------------------------------------------------------------------------
81
82/// Statistics reported by [`run_pipeline`].
83#[derive(Debug, Clone, Default)]
84pub struct RunStats {
85    /// Total number of successfully processed items.
86    pub items_processed: usize,
87    /// Number of chunks delivered to the pipeline.
88    pub chunks_processed: usize,
89    /// Number of items skipped due to errors (if `skip_errors` is set).
90    pub errors_skipped: usize,
91}
92
93/// Options for [`run_pipeline`].
94#[derive(Debug, Clone)]
95pub struct RunOptions {
96    /// If `true`, item-level errors from the source iterator are counted and
97    /// skipped rather than causing [`run_pipeline`] to return early.
98    /// Default: `false`.
99    pub skip_errors: bool,
100}
101
102impl Default for RunOptions {
103    fn default() -> Self { Self { skip_errors: false } }
104}
105
106/// Drive `pipeline` by reading from `source` in chunks of `chunk_size` items.
107///
108/// Returns [`RunStats`] on success.  The iterator's item errors are propagated
109/// unless [`RunOptions::skip_errors`] is set.
110///
111/// # Arguments
112/// * `pipeline`   – A mutable reference to a [`StreamingPipeline`].
113/// * `source`     – Any iterator whose items are `Result<T>`.
114/// * `chunk_size` – Number of items to accumulate before calling
115///                  [`StreamingPipeline::process_chunk`].  Must be ≥ 1.
116pub fn run_pipeline<T, P>(
117    pipeline: &mut P,
118    source: impl Iterator<Item = Result<T>>,
119    chunk_size: usize,
120) -> Result<RunStats>
121where
122    P: StreamingPipeline<T>,
123{
124    run_pipeline_with_options(pipeline, source, chunk_size, &RunOptions::default())
125}
126
127/// Like [`run_pipeline`] but accepts explicit options.
128pub fn run_pipeline_with_options<T, P>(
129    pipeline: &mut P,
130    source: impl Iterator<Item = Result<T>>,
131    chunk_size: usize,
132    opts: &RunOptions,
133) -> Result<RunStats>
134where
135    P: StreamingPipeline<T>,
136{
137    if chunk_size == 0 {
138        return Err(Error::InvalidData("chunk_size must be ≥ 1".into()));
139    }
140
141    let mut stats = RunStats::default();
142    let mut chunk: Vec<T> = Vec::with_capacity(chunk_size);
143
144    for item in source {
145        match item {
146            Ok(point) => {
147                chunk.push(point);
148                if chunk.len() == chunk_size {
149                    pipeline.process_chunk(&chunk)?;
150                    stats.items_processed += chunk.len();
151                    stats.chunks_processed += 1;
152                    chunk.clear();
153                }
154            }
155            Err(e) => {
156                if opts.skip_errors {
157                    stats.errors_skipped += 1;
158                } else {
159                    return Err(e);
160                }
161            }
162        }
163    }
164
165    // Flush any remaining items.
166    if !chunk.is_empty() {
167        pipeline.process_chunk(&chunk)?;
168        stats.items_processed += chunk.len();
169        stats.chunks_processed += 1;
170    }
171
172    Ok(stats)
173}
174
175// ---------------------------------------------------------------------------
176// StreamingVoxelFilter
177// ---------------------------------------------------------------------------
178
179/// Configuration for [`StreamingVoxelFilter`].
180#[derive(Debug, Clone)]
181pub struct StreamingVoxelFilterConfig {
182    /// Side length of each cubic voxel (same units as the point coordinates).
183    /// Must be positive.
184    pub voxel_size: f32,
185}
186
187/// Streaming voxel-grid downsampler.
188///
189/// Maintains a [`HashMap`] from voxel coordinates to a representative point.
190/// Peak memory is `O(V)` where `V` is the number of occupied voxels in the
191/// entire dataset — typically far smaller than N points.
192///
193/// Unlike the in-memory [`voxel_grid_filter`](crate::filtering::voxel_grid_filter),
194/// no bounding-box pre-scan is required; voxel keys are derived by dividing
195/// each coordinate by `voxel_size` and rounding toward negative infinity, so
196/// they are consistent across all chunks.
197///
198/// The representative point for each voxel is the **centroid** of all points
199/// assigned to that voxel, giving a smoother result than first-point selection.
200pub struct StreamingVoxelFilter {
201    config: StreamingVoxelFilterConfig,
202    /// Accumulated sum and count for centroid computation.
203    voxels: HashMap<(i32, i32, i32), ([f64; 3], u32)>,
204}
205
206impl StreamingVoxelFilter {
207    /// Create a new streaming voxel filter.
208    pub fn new(config: StreamingVoxelFilterConfig) -> Self {
209        Self { config, voxels: HashMap::new() }
210    }
211
212    #[inline]
213    fn voxel_key(&self, p: &Point3f) -> (i32, i32, i32) {
214        let inv = 1.0 / self.config.voxel_size;
215        (
216            (p.x * inv).floor() as i32,
217            (p.y * inv).floor() as i32,
218            (p.z * inv).floor() as i32,
219        )
220    }
221
222    /// Number of occupied voxels accumulated so far.
223    pub fn voxel_count(&self) -> usize { self.voxels.len() }
224}
225
226impl StreamingPipeline<Point3f> for StreamingVoxelFilter {
227    type Output = PointCloud<Point3f>;
228
229    fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
230        if self.config.voxel_size <= 0.0 {
231            return Err(Error::InvalidData("voxel_size must be positive".into()));
232        }
233        for p in chunk {
234            let key = self.voxel_key(p);
235            let entry = self.voxels.entry(key).or_insert(([0.0; 3], 0));
236            entry.0[0] += p.x as f64;
237            entry.0[1] += p.y as f64;
238            entry.0[2] += p.z as f64;
239            entry.1 += 1;
240        }
241        Ok(())
242    }
243
244    fn finalize(self) -> Result<PointCloud<Point3f>> {
245        let points: Vec<Point3f> = self
246            .voxels
247            .values()
248            .map(|(sum, count)| {
249                let n = *count as f64;
250                Point3f::new((sum[0] / n) as f32, (sum[1] / n) as f32, (sum[2] / n) as f32)
251            })
252            .collect();
253        Ok(PointCloud::from_points(points))
254    }
255
256    fn memory_bytes(&self) -> usize {
257        // Each entry: key (12 bytes) + value (28 bytes) + HashMap overhead (~50 bytes).
258        self.voxels.len() * 90
259    }
260}
261
262// ---------------------------------------------------------------------------
263// StreamingStatistics
264// ---------------------------------------------------------------------------
265
266/// Accumulated statistics produced by [`StreamingStatistics`].
267#[derive(Debug, Clone)]
268pub struct PointCloudStats {
269    /// Total number of points processed.
270    pub point_count: u64,
271    /// Minimum coordinate (axis-aligned bounding box corner).
272    pub min: Point3f,
273    /// Maximum coordinate (axis-aligned bounding box corner).
274    pub max: Point3f,
275    /// Per-axis mean coordinates.
276    pub mean: Point3f,
277}
278
279/// Streaming statistics collector.
280///
281/// Computes bounding box, point count, and mean position in a single pass
282/// without retaining any individual points.  Peak memory is `O(1)`.
283pub struct StreamingStatistics {
284    count: u64,
285    min: [f32; 3],
286    max: [f32; 3],
287    sum: [f64; 3],
288}
289
290impl StreamingStatistics {
291    /// Create a new statistics collector.
292    pub fn new() -> Self {
293        Self {
294            count: 0,
295            min: [f32::INFINITY; 3],
296            max: [f32::NEG_INFINITY; 3],
297            sum: [0.0; 3],
298        }
299    }
300}
301
302impl Default for StreamingStatistics {
303    fn default() -> Self { Self::new() }
304}
305
306impl StreamingPipeline<Point3f> for StreamingStatistics {
307    type Output = PointCloudStats;
308
309    fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
310        for p in chunk {
311            self.count += 1;
312            self.min[0] = self.min[0].min(p.x);
313            self.min[1] = self.min[1].min(p.y);
314            self.min[2] = self.min[2].min(p.z);
315            self.max[0] = self.max[0].max(p.x);
316            self.max[1] = self.max[1].max(p.y);
317            self.max[2] = self.max[2].max(p.z);
318            self.sum[0] += p.x as f64;
319            self.sum[1] += p.y as f64;
320            self.sum[2] += p.z as f64;
321        }
322        Ok(())
323    }
324
325    fn finalize(self) -> Result<PointCloudStats> {
326        if self.count == 0 {
327            return Err(Error::InvalidData("no points were processed".into()));
328        }
329        let n = self.count as f64;
330        Ok(PointCloudStats {
331            point_count: self.count,
332            min: Point3f::new(self.min[0], self.min[1], self.min[2]),
333            max: Point3f::new(self.max[0], self.max[1], self.max[2]),
334            mean: Point3f::new(
335                (self.sum[0] / n) as f32,
336                (self.sum[1] / n) as f32,
337                (self.sum[2] / n) as f32,
338            ),
339        })
340    }
341
342    fn memory_bytes(&self) -> usize { std::mem::size_of::<Self>() }
343}
344
345// ---------------------------------------------------------------------------
346// StreamingCollector
347// ---------------------------------------------------------------------------
348
349/// Streaming pipeline stage that collects all points into a `PointCloud`.
350///
351/// Useful for testing or as a terminal stage when the full cloud must
352/// eventually be materialized (e.g. after prior stages have filtered it down).
353pub struct StreamingCollector {
354    points: Vec<Point3f>,
355}
356
357impl StreamingCollector {
358    /// Create a new collector.
359    pub fn new() -> Self { Self { points: Vec::new() } }
360
361    /// Create a collector with pre-allocated capacity.
362    pub fn with_capacity(cap: usize) -> Self {
363        Self { points: Vec::with_capacity(cap) }
364    }
365}
366
367impl Default for StreamingCollector {
368    fn default() -> Self { Self::new() }
369}
370
371impl StreamingPipeline<Point3f> for StreamingCollector {
372    type Output = PointCloud<Point3f>;
373
374    fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
375        self.points.extend_from_slice(chunk);
376        Ok(())
377    }
378
379    fn finalize(self) -> Result<PointCloud<Point3f>> {
380        Ok(PointCloud::from_points(self.points))
381    }
382
383    fn memory_bytes(&self) -> usize {
384        self.points.len() * std::mem::size_of::<Point3f>()
385    }
386}
387
388// ---------------------------------------------------------------------------
389// Streaming source helpers
390// ---------------------------------------------------------------------------
391
392/// Wrap a `PointCloud` as a streaming source of `Result<Point3f>`.
393///
394/// Useful for testing pipelines without a file on disk.
395pub fn cloud_as_stream(
396    cloud: &PointCloud<Point3f>,
397) -> impl Iterator<Item = Result<Point3f>> + '_ {
398    cloud.points.iter().copied().map(Ok)
399}
400
401// ---------------------------------------------------------------------------
402// Tests
403// ---------------------------------------------------------------------------
404
405#[cfg(test)]
406mod tests {
407    use super::*;
408
409    fn grid_cloud(n: usize) -> PointCloud<Point3f> {
410        let pts: Vec<Point3f> = (0..n)
411            .map(|i| Point3f::new(i as f32 * 0.1, 0.0, 0.0))
412            .collect();
413        PointCloud::from_points(pts)
414    }
415
416    // ---- StreamingCollector -----------------------------------------------
417
418    #[test]
419    fn test_collector_round_trip() {
420        let cloud = grid_cloud(25);
421        let mut collector = StreamingCollector::new();
422        let stats = run_pipeline(&mut collector, cloud_as_stream(&cloud), 8).unwrap();
423        let out = collector.finalize().unwrap();
424
425        assert_eq!(stats.items_processed, 25);
426        assert_eq!(stats.chunks_processed, 4); // 8+8+8+1
427        assert_eq!(out.len(), 25);
428    }
429
430    // ---- StreamingStatistics ----------------------------------------------
431
432    #[test]
433    fn test_statistics_correctness() {
434        // Three points: (0,0,0), (1,0,0), (2,0,0)
435        let cloud = PointCloud::from_points(vec![
436            Point3f::new(0.0, 0.0, 0.0),
437            Point3f::new(1.0, 0.0, 0.0),
438            Point3f::new(2.0, 0.0, 0.0),
439        ]);
440        let mut stats_pipe = StreamingStatistics::new();
441        run_pipeline(&mut stats_pipe, cloud_as_stream(&cloud), 2).unwrap();
442        let s = stats_pipe.finalize().unwrap();
443
444        assert_eq!(s.point_count, 3);
445        assert!((s.min.x - 0.0).abs() < 1e-6);
446        assert!((s.max.x - 2.0).abs() < 1e-6);
447        assert!((s.mean.x - 1.0).abs() < 1e-6);
448    }
449
450    #[test]
451    fn test_statistics_empty_fails() {
452        let mut stats_pipe = StreamingStatistics::new();
453        // No chunks processed — finalize should error.
454        assert!(stats_pipe.finalize().is_err());
455    }
456
457    // ---- StreamingVoxelFilter ---------------------------------------------
458
459    #[test]
460    fn test_voxel_filter_reduces_density() {
461        // 100 points at x=0..9.9 in steps of 0.1 → should collapse to ~10 voxels
462        // with voxel_size=1.0.
463        let cloud = grid_cloud(100);
464        let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
465        let mut filter = StreamingVoxelFilter::new(config);
466        run_pipeline(&mut filter, cloud_as_stream(&cloud), 32).unwrap();
467        let out = filter.finalize().unwrap();
468
469        assert!(out.len() <= 10, "expected ≤10 voxels, got {}", out.len());
470        assert!(!out.is_empty());
471    }
472
473    #[test]
474    fn test_voxel_filter_centroid() {
475        // Two points in the same voxel → centroid should be their midpoint.
476        let cloud = PointCloud::from_points(vec![
477            Point3f::new(0.1, 0.0, 0.0),
478            Point3f::new(0.3, 0.0, 0.0),
479        ]);
480        let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
481        let mut filter = StreamingVoxelFilter::new(config);
482        run_pipeline(&mut filter, cloud_as_stream(&cloud), 10).unwrap();
483        let out = filter.finalize().unwrap();
484
485        assert_eq!(out.len(), 1);
486        assert!((out.points[0].x - 0.2).abs() < 1e-5);
487    }
488
489    #[test]
490    fn test_voxel_filter_across_chunk_boundary() {
491        // The two points that belong to the same voxel are split across chunks.
492        // The filter must still merge them.
493        let cloud = PointCloud::from_points(vec![
494            Point3f::new(0.1, 0.0, 0.0),
495            Point3f::new(0.9, 0.0, 0.0),
496        ]);
497        let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
498        let mut filter = StreamingVoxelFilter::new(config);
499        // chunk_size=1 forces each point into its own chunk.
500        run_pipeline(&mut filter, cloud_as_stream(&cloud), 1).unwrap();
501        let out = filter.finalize().unwrap();
502
503        assert_eq!(out.len(), 1, "points in the same voxel across chunks should merge");
504        assert!((out.points[0].x - 0.5).abs() < 1e-5);
505    }
506
507    #[test]
508    fn test_invalid_voxel_size() {
509        let config = StreamingVoxelFilterConfig { voxel_size: -1.0 };
510        let mut filter = StreamingVoxelFilter::new(config);
511        let cloud = PointCloud::from_points(vec![Point3f::new(0.0, 0.0, 0.0)]);
512        let result = run_pipeline(&mut filter, cloud_as_stream(&cloud), 1);
513        assert!(result.is_err());
514    }
515
516    // ---- run_pipeline options --------------------------------------------
517
518    #[test]
519    fn test_skip_errors() {
520        let source: Vec<Result<Point3f>> = vec![
521            Ok(Point3f::new(0.0, 0.0, 0.0)),
522            Err(Error::InvalidData("bad point".into())),
523            Ok(Point3f::new(1.0, 0.0, 0.0)),
524        ];
525        let mut collector = StreamingCollector::new();
526        let run_stats = run_pipeline_with_options(
527            &mut collector,
528            source.into_iter(),
529            10,
530            &RunOptions { skip_errors: true },
531        )
532        .unwrap();
533        let out = collector.finalize().unwrap();
534        assert_eq!(out.len(), 2);
535        assert_eq!(run_stats.errors_skipped, 1);
536    }
537
538    #[test]
539    fn test_error_propagation() {
540        let source: Vec<Result<Point3f>> = vec![
541            Ok(Point3f::new(0.0, 0.0, 0.0)),
542            Err(Error::InvalidData("bad point".into())),
543        ];
544        let mut collector = StreamingCollector::new();
545        // Default options: errors propagate.
546        assert!(run_pipeline(&mut collector, source.into_iter(), 10).is_err());
547    }
548
549    #[test]
550    fn test_chunk_size_zero_fails() {
551        let mut collector = StreamingCollector::new();
552        let result = run_pipeline(
553            &mut collector,
554            std::iter::empty::<Result<Point3f>>(),
555            0,
556        );
557        assert!(result.is_err());
558    }
559
560    #[test]
561    fn test_memory_bytes() {
562        let mut filter = StreamingVoxelFilter::new(StreamingVoxelFilterConfig { voxel_size: 0.5 });
563        let cloud = grid_cloud(20);
564        run_pipeline(&mut filter, cloud_as_stream(&cloud), 5).unwrap();
565        // At least some voxels should be occupied and memory should be > 0.
566        assert!(filter.memory_bytes() > 0);
567    }
568}