1use std::collections::HashMap;
47use threecrate_core::{Error, Point3f, PointCloud, Result};
48
49pub trait StreamingPipeline<T> {
63 type Output;
65
66 fn process_chunk(&mut self, chunk: &[T]) -> Result<()>;
69
70 fn finalize(self) -> Result<Self::Output>;
72
73 fn memory_bytes(&self) -> usize { 0 }
76}
77
78#[derive(Debug, Clone, Default)]
84pub struct RunStats {
85 pub items_processed: usize,
87 pub chunks_processed: usize,
89 pub errors_skipped: usize,
91}
92
93#[derive(Debug, Clone)]
95pub struct RunOptions {
96 pub skip_errors: bool,
100}
101
102impl Default for RunOptions {
103 fn default() -> Self { Self { skip_errors: false } }
104}
105
106pub fn run_pipeline<T, P>(
117 pipeline: &mut P,
118 source: impl Iterator<Item = Result<T>>,
119 chunk_size: usize,
120) -> Result<RunStats>
121where
122 P: StreamingPipeline<T>,
123{
124 run_pipeline_with_options(pipeline, source, chunk_size, &RunOptions::default())
125}
126
127pub fn run_pipeline_with_options<T, P>(
129 pipeline: &mut P,
130 source: impl Iterator<Item = Result<T>>,
131 chunk_size: usize,
132 opts: &RunOptions,
133) -> Result<RunStats>
134where
135 P: StreamingPipeline<T>,
136{
137 if chunk_size == 0 {
138 return Err(Error::InvalidData("chunk_size must be ≥ 1".into()));
139 }
140
141 let mut stats = RunStats::default();
142 let mut chunk: Vec<T> = Vec::with_capacity(chunk_size);
143
144 for item in source {
145 match item {
146 Ok(point) => {
147 chunk.push(point);
148 if chunk.len() == chunk_size {
149 pipeline.process_chunk(&chunk)?;
150 stats.items_processed += chunk.len();
151 stats.chunks_processed += 1;
152 chunk.clear();
153 }
154 }
155 Err(e) => {
156 if opts.skip_errors {
157 stats.errors_skipped += 1;
158 } else {
159 return Err(e);
160 }
161 }
162 }
163 }
164
165 if !chunk.is_empty() {
167 pipeline.process_chunk(&chunk)?;
168 stats.items_processed += chunk.len();
169 stats.chunks_processed += 1;
170 }
171
172 Ok(stats)
173}
174
175#[derive(Debug, Clone)]
181pub struct StreamingVoxelFilterConfig {
182 pub voxel_size: f32,
185}
186
187pub struct StreamingVoxelFilter {
201 config: StreamingVoxelFilterConfig,
202 voxels: HashMap<(i32, i32, i32), ([f64; 3], u32)>,
204}
205
206impl StreamingVoxelFilter {
207 pub fn new(config: StreamingVoxelFilterConfig) -> Self {
209 Self { config, voxels: HashMap::new() }
210 }
211
212 #[inline]
213 fn voxel_key(&self, p: &Point3f) -> (i32, i32, i32) {
214 let inv = 1.0 / self.config.voxel_size;
215 (
216 (p.x * inv).floor() as i32,
217 (p.y * inv).floor() as i32,
218 (p.z * inv).floor() as i32,
219 )
220 }
221
222 pub fn voxel_count(&self) -> usize { self.voxels.len() }
224}
225
226impl StreamingPipeline<Point3f> for StreamingVoxelFilter {
227 type Output = PointCloud<Point3f>;
228
229 fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
230 if self.config.voxel_size <= 0.0 {
231 return Err(Error::InvalidData("voxel_size must be positive".into()));
232 }
233 for p in chunk {
234 let key = self.voxel_key(p);
235 let entry = self.voxels.entry(key).or_insert(([0.0; 3], 0));
236 entry.0[0] += p.x as f64;
237 entry.0[1] += p.y as f64;
238 entry.0[2] += p.z as f64;
239 entry.1 += 1;
240 }
241 Ok(())
242 }
243
244 fn finalize(self) -> Result<PointCloud<Point3f>> {
245 let points: Vec<Point3f> = self
246 .voxels
247 .values()
248 .map(|(sum, count)| {
249 let n = *count as f64;
250 Point3f::new((sum[0] / n) as f32, (sum[1] / n) as f32, (sum[2] / n) as f32)
251 })
252 .collect();
253 Ok(PointCloud::from_points(points))
254 }
255
256 fn memory_bytes(&self) -> usize {
257 self.voxels.len() * 90
259 }
260}
261
262#[derive(Debug, Clone)]
268pub struct PointCloudStats {
269 pub point_count: u64,
271 pub min: Point3f,
273 pub max: Point3f,
275 pub mean: Point3f,
277}
278
279pub struct StreamingStatistics {
284 count: u64,
285 min: [f32; 3],
286 max: [f32; 3],
287 sum: [f64; 3],
288}
289
290impl StreamingStatistics {
291 pub fn new() -> Self {
293 Self {
294 count: 0,
295 min: [f32::INFINITY; 3],
296 max: [f32::NEG_INFINITY; 3],
297 sum: [0.0; 3],
298 }
299 }
300}
301
302impl Default for StreamingStatistics {
303 fn default() -> Self { Self::new() }
304}
305
306impl StreamingPipeline<Point3f> for StreamingStatistics {
307 type Output = PointCloudStats;
308
309 fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
310 for p in chunk {
311 self.count += 1;
312 self.min[0] = self.min[0].min(p.x);
313 self.min[1] = self.min[1].min(p.y);
314 self.min[2] = self.min[2].min(p.z);
315 self.max[0] = self.max[0].max(p.x);
316 self.max[1] = self.max[1].max(p.y);
317 self.max[2] = self.max[2].max(p.z);
318 self.sum[0] += p.x as f64;
319 self.sum[1] += p.y as f64;
320 self.sum[2] += p.z as f64;
321 }
322 Ok(())
323 }
324
325 fn finalize(self) -> Result<PointCloudStats> {
326 if self.count == 0 {
327 return Err(Error::InvalidData("no points were processed".into()));
328 }
329 let n = self.count as f64;
330 Ok(PointCloudStats {
331 point_count: self.count,
332 min: Point3f::new(self.min[0], self.min[1], self.min[2]),
333 max: Point3f::new(self.max[0], self.max[1], self.max[2]),
334 mean: Point3f::new(
335 (self.sum[0] / n) as f32,
336 (self.sum[1] / n) as f32,
337 (self.sum[2] / n) as f32,
338 ),
339 })
340 }
341
342 fn memory_bytes(&self) -> usize { std::mem::size_of::<Self>() }
343}
344
345pub struct StreamingCollector {
354 points: Vec<Point3f>,
355}
356
357impl StreamingCollector {
358 pub fn new() -> Self { Self { points: Vec::new() } }
360
361 pub fn with_capacity(cap: usize) -> Self {
363 Self { points: Vec::with_capacity(cap) }
364 }
365}
366
367impl Default for StreamingCollector {
368 fn default() -> Self { Self::new() }
369}
370
371impl StreamingPipeline<Point3f> for StreamingCollector {
372 type Output = PointCloud<Point3f>;
373
374 fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
375 self.points.extend_from_slice(chunk);
376 Ok(())
377 }
378
379 fn finalize(self) -> Result<PointCloud<Point3f>> {
380 Ok(PointCloud::from_points(self.points))
381 }
382
383 fn memory_bytes(&self) -> usize {
384 self.points.len() * std::mem::size_of::<Point3f>()
385 }
386}
387
388pub fn cloud_as_stream(
396 cloud: &PointCloud<Point3f>,
397) -> impl Iterator<Item = Result<Point3f>> + '_ {
398 cloud.points.iter().copied().map(Ok)
399}
400
401#[cfg(test)]
406mod tests {
407 use super::*;
408
409 fn grid_cloud(n: usize) -> PointCloud<Point3f> {
410 let pts: Vec<Point3f> = (0..n)
411 .map(|i| Point3f::new(i as f32 * 0.1, 0.0, 0.0))
412 .collect();
413 PointCloud::from_points(pts)
414 }
415
416 #[test]
419 fn test_collector_round_trip() {
420 let cloud = grid_cloud(25);
421 let mut collector = StreamingCollector::new();
422 let stats = run_pipeline(&mut collector, cloud_as_stream(&cloud), 8).unwrap();
423 let out = collector.finalize().unwrap();
424
425 assert_eq!(stats.items_processed, 25);
426 assert_eq!(stats.chunks_processed, 4); assert_eq!(out.len(), 25);
428 }
429
430 #[test]
433 fn test_statistics_correctness() {
434 let cloud = PointCloud::from_points(vec![
436 Point3f::new(0.0, 0.0, 0.0),
437 Point3f::new(1.0, 0.0, 0.0),
438 Point3f::new(2.0, 0.0, 0.0),
439 ]);
440 let mut stats_pipe = StreamingStatistics::new();
441 run_pipeline(&mut stats_pipe, cloud_as_stream(&cloud), 2).unwrap();
442 let s = stats_pipe.finalize().unwrap();
443
444 assert_eq!(s.point_count, 3);
445 assert!((s.min.x - 0.0).abs() < 1e-6);
446 assert!((s.max.x - 2.0).abs() < 1e-6);
447 assert!((s.mean.x - 1.0).abs() < 1e-6);
448 }
449
450 #[test]
451 fn test_statistics_empty_fails() {
452 let mut stats_pipe = StreamingStatistics::new();
453 assert!(stats_pipe.finalize().is_err());
455 }
456
457 #[test]
460 fn test_voxel_filter_reduces_density() {
461 let cloud = grid_cloud(100);
464 let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
465 let mut filter = StreamingVoxelFilter::new(config);
466 run_pipeline(&mut filter, cloud_as_stream(&cloud), 32).unwrap();
467 let out = filter.finalize().unwrap();
468
469 assert!(out.len() <= 10, "expected ≤10 voxels, got {}", out.len());
470 assert!(!out.is_empty());
471 }
472
473 #[test]
474 fn test_voxel_filter_centroid() {
475 let cloud = PointCloud::from_points(vec![
477 Point3f::new(0.1, 0.0, 0.0),
478 Point3f::new(0.3, 0.0, 0.0),
479 ]);
480 let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
481 let mut filter = StreamingVoxelFilter::new(config);
482 run_pipeline(&mut filter, cloud_as_stream(&cloud), 10).unwrap();
483 let out = filter.finalize().unwrap();
484
485 assert_eq!(out.len(), 1);
486 assert!((out.points[0].x - 0.2).abs() < 1e-5);
487 }
488
489 #[test]
490 fn test_voxel_filter_across_chunk_boundary() {
491 let cloud = PointCloud::from_points(vec![
494 Point3f::new(0.1, 0.0, 0.0),
495 Point3f::new(0.9, 0.0, 0.0),
496 ]);
497 let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
498 let mut filter = StreamingVoxelFilter::new(config);
499 run_pipeline(&mut filter, cloud_as_stream(&cloud), 1).unwrap();
501 let out = filter.finalize().unwrap();
502
503 assert_eq!(out.len(), 1, "points in the same voxel across chunks should merge");
504 assert!((out.points[0].x - 0.5).abs() < 1e-5);
505 }
506
507 #[test]
508 fn test_invalid_voxel_size() {
509 let config = StreamingVoxelFilterConfig { voxel_size: -1.0 };
510 let mut filter = StreamingVoxelFilter::new(config);
511 let cloud = PointCloud::from_points(vec![Point3f::new(0.0, 0.0, 0.0)]);
512 let result = run_pipeline(&mut filter, cloud_as_stream(&cloud), 1);
513 assert!(result.is_err());
514 }
515
516 #[test]
519 fn test_skip_errors() {
520 let source: Vec<Result<Point3f>> = vec![
521 Ok(Point3f::new(0.0, 0.0, 0.0)),
522 Err(Error::InvalidData("bad point".into())),
523 Ok(Point3f::new(1.0, 0.0, 0.0)),
524 ];
525 let mut collector = StreamingCollector::new();
526 let run_stats = run_pipeline_with_options(
527 &mut collector,
528 source.into_iter(),
529 10,
530 &RunOptions { skip_errors: true },
531 )
532 .unwrap();
533 let out = collector.finalize().unwrap();
534 assert_eq!(out.len(), 2);
535 assert_eq!(run_stats.errors_skipped, 1);
536 }
537
538 #[test]
539 fn test_error_propagation() {
540 let source: Vec<Result<Point3f>> = vec![
541 Ok(Point3f::new(0.0, 0.0, 0.0)),
542 Err(Error::InvalidData("bad point".into())),
543 ];
544 let mut collector = StreamingCollector::new();
545 assert!(run_pipeline(&mut collector, source.into_iter(), 10).is_err());
547 }
548
549 #[test]
550 fn test_chunk_size_zero_fails() {
551 let mut collector = StreamingCollector::new();
552 let result = run_pipeline(
553 &mut collector,
554 std::iter::empty::<Result<Point3f>>(),
555 0,
556 );
557 assert!(result.is_err());
558 }
559
560 #[test]
561 fn test_memory_bytes() {
562 let mut filter = StreamingVoxelFilter::new(StreamingVoxelFilterConfig { voxel_size: 0.5 });
563 let cloud = grid_cloud(20);
564 run_pipeline(&mut filter, cloud_as_stream(&cloud), 5).unwrap();
565 assert!(filter.memory_bytes() > 0);
567 }
568}