1use std::collections::HashMap;
54use std::sync::Arc;
55use std::sync::atomic::{AtomicU64, Ordering};
56use std::sync::mpsc::{sync_channel, SyncSender, TrySendError};
57use std::thread::{self, JoinHandle};
58use std::time::Duration;
59use threecrate_core::{Error, Point3f, PointCloud, Result};
60
61pub trait StreamingPipeline<T> {
75 type Output;
77
78 fn process_chunk(&mut self, chunk: &[T]) -> Result<()>;
81
82 fn finalize(self) -> Result<Self::Output>;
84
85 fn memory_bytes(&self) -> usize { 0 }
88}
89
90#[derive(Debug, Clone, Default)]
96pub struct RunStats {
97 pub items_processed: usize,
99 pub chunks_processed: usize,
101 pub errors_skipped: usize,
103}
104
105#[derive(Debug, Clone)]
107pub struct RunOptions {
108 pub skip_errors: bool,
112}
113
114impl Default for RunOptions {
115 fn default() -> Self { Self { skip_errors: false } }
116}
117
118pub fn run_pipeline<T, P>(
129 pipeline: &mut P,
130 source: impl Iterator<Item = Result<T>>,
131 chunk_size: usize,
132) -> Result<RunStats>
133where
134 P: StreamingPipeline<T>,
135{
136 run_pipeline_with_options(pipeline, source, chunk_size, &RunOptions::default())
137}
138
139pub fn run_pipeline_with_options<T, P>(
141 pipeline: &mut P,
142 source: impl Iterator<Item = Result<T>>,
143 chunk_size: usize,
144 opts: &RunOptions,
145) -> Result<RunStats>
146where
147 P: StreamingPipeline<T>,
148{
149 if chunk_size == 0 {
150 return Err(Error::InvalidData("chunk_size must be ≥ 1".into()));
151 }
152
153 let mut stats = RunStats::default();
154 let mut chunk: Vec<T> = Vec::with_capacity(chunk_size);
155
156 for item in source {
157 match item {
158 Ok(point) => {
159 chunk.push(point);
160 if chunk.len() == chunk_size {
161 pipeline.process_chunk(&chunk)?;
162 stats.items_processed += chunk.len();
163 stats.chunks_processed += 1;
164 chunk.clear();
165 }
166 }
167 Err(e) => {
168 if opts.skip_errors {
169 stats.errors_skipped += 1;
170 } else {
171 return Err(e);
172 }
173 }
174 }
175 }
176
177 if !chunk.is_empty() {
179 pipeline.process_chunk(&chunk)?;
180 stats.items_processed += chunk.len();
181 stats.chunks_processed += 1;
182 }
183
184 Ok(stats)
185}
186
187#[derive(Debug, Clone)]
193pub struct StreamingVoxelFilterConfig {
194 pub voxel_size: f32,
197}
198
199pub struct StreamingVoxelFilter {
213 config: StreamingVoxelFilterConfig,
214 voxels: HashMap<(i32, i32, i32), ([f64; 3], u32)>,
216}
217
218impl StreamingVoxelFilter {
219 pub fn new(config: StreamingVoxelFilterConfig) -> Self {
221 Self { config, voxels: HashMap::new() }
222 }
223
224 #[inline]
225 fn voxel_key(&self, p: &Point3f) -> (i32, i32, i32) {
226 let inv = 1.0 / self.config.voxel_size;
227 (
228 (p.x * inv).floor() as i32,
229 (p.y * inv).floor() as i32,
230 (p.z * inv).floor() as i32,
231 )
232 }
233
234 pub fn voxel_count(&self) -> usize { self.voxels.len() }
236}
237
238impl StreamingPipeline<Point3f> for StreamingVoxelFilter {
239 type Output = PointCloud<Point3f>;
240
241 fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
242 if self.config.voxel_size <= 0.0 {
243 return Err(Error::InvalidData("voxel_size must be positive".into()));
244 }
245 for p in chunk {
246 let key = self.voxel_key(p);
247 let entry = self.voxels.entry(key).or_insert(([0.0; 3], 0));
248 entry.0[0] += p.x as f64;
249 entry.0[1] += p.y as f64;
250 entry.0[2] += p.z as f64;
251 entry.1 += 1;
252 }
253 Ok(())
254 }
255
256 fn finalize(self) -> Result<PointCloud<Point3f>> {
257 let points: Vec<Point3f> = self
258 .voxels
259 .values()
260 .map(|(sum, count)| {
261 let n = *count as f64;
262 Point3f::new((sum[0] / n) as f32, (sum[1] / n) as f32, (sum[2] / n) as f32)
263 })
264 .collect();
265 Ok(PointCloud::from_points(points))
266 }
267
268 fn memory_bytes(&self) -> usize {
269 self.voxels.len() * 90
271 }
272}
273
274#[derive(Debug, Clone)]
280pub struct PointCloudStats {
281 pub point_count: u64,
283 pub min: Point3f,
285 pub max: Point3f,
287 pub mean: Point3f,
289}
290
291pub struct StreamingStatistics {
296 count: u64,
297 min: [f32; 3],
298 max: [f32; 3],
299 sum: [f64; 3],
300}
301
302impl StreamingStatistics {
303 pub fn new() -> Self {
305 Self {
306 count: 0,
307 min: [f32::INFINITY; 3],
308 max: [f32::NEG_INFINITY; 3],
309 sum: [0.0; 3],
310 }
311 }
312}
313
314impl Default for StreamingStatistics {
315 fn default() -> Self { Self::new() }
316}
317
318impl StreamingPipeline<Point3f> for StreamingStatistics {
319 type Output = PointCloudStats;
320
321 fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
322 for p in chunk {
323 self.count += 1;
324 self.min[0] = self.min[0].min(p.x);
325 self.min[1] = self.min[1].min(p.y);
326 self.min[2] = self.min[2].min(p.z);
327 self.max[0] = self.max[0].max(p.x);
328 self.max[1] = self.max[1].max(p.y);
329 self.max[2] = self.max[2].max(p.z);
330 self.sum[0] += p.x as f64;
331 self.sum[1] += p.y as f64;
332 self.sum[2] += p.z as f64;
333 }
334 Ok(())
335 }
336
337 fn finalize(self) -> Result<PointCloudStats> {
338 if self.count == 0 {
339 return Err(Error::InvalidData("no points were processed".into()));
340 }
341 let n = self.count as f64;
342 Ok(PointCloudStats {
343 point_count: self.count,
344 min: Point3f::new(self.min[0], self.min[1], self.min[2]),
345 max: Point3f::new(self.max[0], self.max[1], self.max[2]),
346 mean: Point3f::new(
347 (self.sum[0] / n) as f32,
348 (self.sum[1] / n) as f32,
349 (self.sum[2] / n) as f32,
350 ),
351 })
352 }
353
354 fn memory_bytes(&self) -> usize { std::mem::size_of::<Self>() }
355}
356
357pub struct StreamingCollector {
366 points: Vec<Point3f>,
367}
368
369impl StreamingCollector {
370 pub fn new() -> Self { Self { points: Vec::new() } }
372
373 pub fn with_capacity(cap: usize) -> Self {
375 Self { points: Vec::with_capacity(cap) }
376 }
377}
378
379impl Default for StreamingCollector {
380 fn default() -> Self { Self::new() }
381}
382
383impl StreamingPipeline<Point3f> for StreamingCollector {
384 type Output = PointCloud<Point3f>;
385
386 fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
387 self.points.extend_from_slice(chunk);
388 Ok(())
389 }
390
391 fn finalize(self) -> Result<PointCloud<Point3f>> {
392 Ok(PointCloud::from_points(self.points))
393 }
394
395 fn memory_bytes(&self) -> usize {
396 self.points.len() * std::mem::size_of::<Point3f>()
397 }
398}
399
400pub fn cloud_as_stream(
408 cloud: &PointCloud<Point3f>,
409) -> impl Iterator<Item = Result<Point3f>> + '_ {
410 cloud.points.iter().copied().map(Ok)
411}
412
413#[derive(Debug, Clone)]
419pub struct BackpressureConfig {
420 pub max_queue_depth: usize,
424 pub chunk_size: usize,
426 pub flush_timeout: Option<Duration>,
432}
433
434impl Default for BackpressureConfig {
435 fn default() -> Self {
436 Self {
437 max_queue_depth: 1024,
438 chunk_size: 256,
439 flush_timeout: Some(Duration::from_millis(10)),
440 }
441 }
442}
443
444#[derive(Debug, Clone, Default)]
446pub struct RealtimeMetrics {
447 pub items_queued: u64,
449 pub items_processed: u64,
451 pub items_dropped: u64,
453 pub estimated_queue_depth: u64,
455}
456
457struct SharedMetrics {
458 items_queued: AtomicU64,
459 items_processed: AtomicU64,
460 items_dropped: AtomicU64,
461}
462
463impl SharedMetrics {
464 fn new() -> Arc<Self> {
465 Arc::new(Self {
466 items_queued: AtomicU64::new(0),
467 items_processed: AtomicU64::new(0),
468 items_dropped: AtomicU64::new(0),
469 })
470 }
471
472 fn snapshot(&self) -> RealtimeMetrics {
473 let queued = self.items_queued.load(Ordering::Relaxed);
474 let processed = self.items_processed.load(Ordering::Relaxed);
475 let dropped = self.items_dropped.load(Ordering::Relaxed);
476 RealtimeMetrics {
477 items_queued: queued,
478 items_processed: processed,
479 items_dropped: dropped,
480 estimated_queue_depth: queued.saturating_sub(processed),
481 }
482 }
483}
484
485pub struct RealtimePipeline<T: Send + 'static, O: Send + 'static> {
520 sender: Option<SyncSender<T>>,
521 metrics: Arc<SharedMetrics>,
522 join_handle: Option<JoinHandle<Result<O>>>,
523}
524
525impl<T: Send + 'static, O: Send + 'static> RealtimePipeline<T, O> {
526 pub fn new<P>(pipeline: P, config: BackpressureConfig) -> Self
530 where
531 P: StreamingPipeline<T, Output = O> + Send + 'static,
532 {
533 assert!(config.chunk_size >= 1, "chunk_size must be ≥ 1");
534 assert!(config.max_queue_depth >= 1, "max_queue_depth must be ≥ 1");
535
536 let (sender, receiver) = sync_channel::<T>(config.max_queue_depth);
537 let metrics = SharedMetrics::new();
538 let metrics_worker = Arc::clone(&metrics);
539 let chunk_size = config.chunk_size;
540 let flush_timeout = config.flush_timeout;
541
542 let join_handle = thread::spawn(move || {
543 realtime_worker(receiver, pipeline, chunk_size, flush_timeout, metrics_worker)
544 });
545
546 Self { sender: Some(sender), metrics, join_handle: Some(join_handle) }
547 }
548
549 pub fn send(&self, item: T) -> Result<()> {
553 let sender = self
554 .sender
555 .as_ref()
556 .ok_or_else(|| Error::InvalidData("pipeline already finished".into()))?;
557 sender
558 .send(item)
559 .map_err(|_| Error::InvalidData("pipeline worker has terminated".into()))?;
560 self.metrics.items_queued.fetch_add(1, Ordering::Relaxed);
561 Ok(())
562 }
563
564 pub fn try_send(&self, item: T) -> Result<bool> {
571 let sender = self
572 .sender
573 .as_ref()
574 .ok_or_else(|| Error::InvalidData("pipeline already finished".into()))?;
575 match sender.try_send(item) {
576 Ok(()) => {
577 self.metrics.items_queued.fetch_add(1, Ordering::Relaxed);
578 Ok(true)
579 }
580 Err(TrySendError::Full(_)) => {
581 self.metrics.items_dropped.fetch_add(1, Ordering::Relaxed);
582 Ok(false)
583 }
584 Err(TrySendError::Disconnected(_)) => {
585 Err(Error::InvalidData("pipeline worker has terminated".into()))
586 }
587 }
588 }
589
590 pub fn metrics(&self) -> RealtimeMetrics {
592 self.metrics.snapshot()
593 }
594
595 pub fn finish(mut self) -> Result<O> {
598 self.sender = None;
599 self.join_handle
600 .take()
601 .expect("pipeline already finished")
602 .join()
603 .map_err(|_| Error::InvalidData("pipeline worker panicked".into()))?
604 }
605}
606
607impl<T: Send + 'static, O: Send + 'static> Drop for RealtimePipeline<T, O> {
608 fn drop(&mut self) {
609 self.sender = None;
610 if let Some(handle) = self.join_handle.take() {
611 let _ = handle.join();
612 }
613 }
614}
615
616fn realtime_worker<T, P>(
617 receiver: std::sync::mpsc::Receiver<T>,
618 mut pipeline: P,
619 chunk_size: usize,
620 flush_timeout: Option<Duration>,
621 metrics: Arc<SharedMetrics>,
622) -> Result<P::Output>
623where
624 P: StreamingPipeline<T>,
625{
626 let mut chunk: Vec<T> = Vec::with_capacity(chunk_size);
627
628 match flush_timeout {
629 None => {
630 for item in receiver {
631 metrics.items_processed.fetch_add(1, Ordering::Relaxed);
632 chunk.push(item);
633 if chunk.len() >= chunk_size {
634 pipeline.process_chunk(&chunk)?;
635 chunk.clear();
636 }
637 }
638 }
639 Some(timeout) => {
640 use std::sync::mpsc::RecvTimeoutError;
641 loop {
642 match receiver.recv_timeout(timeout) {
643 Ok(item) => {
644 metrics.items_processed.fetch_add(1, Ordering::Relaxed);
645 chunk.push(item);
646 if chunk.len() >= chunk_size {
647 pipeline.process_chunk(&chunk)?;
648 chunk.clear();
649 }
650 }
651 Err(RecvTimeoutError::Timeout) => {
652 if !chunk.is_empty() {
653 pipeline.process_chunk(&chunk)?;
654 chunk.clear();
655 }
656 }
657 Err(RecvTimeoutError::Disconnected) => break,
658 }
659 }
660 }
661 }
662
663 if !chunk.is_empty() {
664 pipeline.process_chunk(&chunk)?;
665 }
666
667 pipeline.finalize()
668}
669
670#[cfg(test)]
675mod tests {
676 use super::*;
677
678 fn grid_cloud(n: usize) -> PointCloud<Point3f> {
679 let pts: Vec<Point3f> = (0..n)
680 .map(|i| Point3f::new(i as f32 * 0.1, 0.0, 0.0))
681 .collect();
682 PointCloud::from_points(pts)
683 }
684
685 #[test]
688 fn test_collector_round_trip() {
689 let cloud = grid_cloud(25);
690 let mut collector = StreamingCollector::new();
691 let stats = run_pipeline(&mut collector, cloud_as_stream(&cloud), 8).unwrap();
692 let out = collector.finalize().unwrap();
693
694 assert_eq!(stats.items_processed, 25);
695 assert_eq!(stats.chunks_processed, 4); assert_eq!(out.len(), 25);
697 }
698
699 #[test]
702 fn test_statistics_correctness() {
703 let cloud = PointCloud::from_points(vec![
705 Point3f::new(0.0, 0.0, 0.0),
706 Point3f::new(1.0, 0.0, 0.0),
707 Point3f::new(2.0, 0.0, 0.0),
708 ]);
709 let mut stats_pipe = StreamingStatistics::new();
710 run_pipeline(&mut stats_pipe, cloud_as_stream(&cloud), 2).unwrap();
711 let s = stats_pipe.finalize().unwrap();
712
713 assert_eq!(s.point_count, 3);
714 assert!((s.min.x - 0.0).abs() < 1e-6);
715 assert!((s.max.x - 2.0).abs() < 1e-6);
716 assert!((s.mean.x - 1.0).abs() < 1e-6);
717 }
718
719 #[test]
720 fn test_statistics_empty_fails() {
721 let mut stats_pipe = StreamingStatistics::new();
722 assert!(stats_pipe.finalize().is_err());
724 }
725
726 #[test]
729 fn test_voxel_filter_reduces_density() {
730 let cloud = grid_cloud(100);
733 let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
734 let mut filter = StreamingVoxelFilter::new(config);
735 run_pipeline(&mut filter, cloud_as_stream(&cloud), 32).unwrap();
736 let out = filter.finalize().unwrap();
737
738 assert!(out.len() <= 10, "expected ≤10 voxels, got {}", out.len());
739 assert!(!out.is_empty());
740 }
741
742 #[test]
743 fn test_voxel_filter_centroid() {
744 let cloud = PointCloud::from_points(vec![
746 Point3f::new(0.1, 0.0, 0.0),
747 Point3f::new(0.3, 0.0, 0.0),
748 ]);
749 let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
750 let mut filter = StreamingVoxelFilter::new(config);
751 run_pipeline(&mut filter, cloud_as_stream(&cloud), 10).unwrap();
752 let out = filter.finalize().unwrap();
753
754 assert_eq!(out.len(), 1);
755 assert!((out.points[0].x - 0.2).abs() < 1e-5);
756 }
757
758 #[test]
759 fn test_voxel_filter_across_chunk_boundary() {
760 let cloud = PointCloud::from_points(vec![
763 Point3f::new(0.1, 0.0, 0.0),
764 Point3f::new(0.9, 0.0, 0.0),
765 ]);
766 let config = StreamingVoxelFilterConfig { voxel_size: 1.0 };
767 let mut filter = StreamingVoxelFilter::new(config);
768 run_pipeline(&mut filter, cloud_as_stream(&cloud), 1).unwrap();
770 let out = filter.finalize().unwrap();
771
772 assert_eq!(out.len(), 1, "points in the same voxel across chunks should merge");
773 assert!((out.points[0].x - 0.5).abs() < 1e-5);
774 }
775
776 #[test]
777 fn test_invalid_voxel_size() {
778 let config = StreamingVoxelFilterConfig { voxel_size: -1.0 };
779 let mut filter = StreamingVoxelFilter::new(config);
780 let cloud = PointCloud::from_points(vec![Point3f::new(0.0, 0.0, 0.0)]);
781 let result = run_pipeline(&mut filter, cloud_as_stream(&cloud), 1);
782 assert!(result.is_err());
783 }
784
785 #[test]
788 fn test_skip_errors() {
789 let source: Vec<Result<Point3f>> = vec![
790 Ok(Point3f::new(0.0, 0.0, 0.0)),
791 Err(Error::InvalidData("bad point".into())),
792 Ok(Point3f::new(1.0, 0.0, 0.0)),
793 ];
794 let mut collector = StreamingCollector::new();
795 let run_stats = run_pipeline_with_options(
796 &mut collector,
797 source.into_iter(),
798 10,
799 &RunOptions { skip_errors: true },
800 )
801 .unwrap();
802 let out = collector.finalize().unwrap();
803 assert_eq!(out.len(), 2);
804 assert_eq!(run_stats.errors_skipped, 1);
805 }
806
807 #[test]
808 fn test_error_propagation() {
809 let source: Vec<Result<Point3f>> = vec![
810 Ok(Point3f::new(0.0, 0.0, 0.0)),
811 Err(Error::InvalidData("bad point".into())),
812 ];
813 let mut collector = StreamingCollector::new();
814 assert!(run_pipeline(&mut collector, source.into_iter(), 10).is_err());
816 }
817
818 #[test]
819 fn test_chunk_size_zero_fails() {
820 let mut collector = StreamingCollector::new();
821 let result = run_pipeline(
822 &mut collector,
823 std::iter::empty::<Result<Point3f>>(),
824 0,
825 );
826 assert!(result.is_err());
827 }
828
829 #[test]
830 fn test_memory_bytes() {
831 let mut filter = StreamingVoxelFilter::new(StreamingVoxelFilterConfig { voxel_size: 0.5 });
832 let cloud = grid_cloud(20);
833 run_pipeline(&mut filter, cloud_as_stream(&cloud), 5).unwrap();
834 assert!(filter.memory_bytes() > 0);
836 }
837
838 #[test]
841 fn test_realtime_basic_round_trip() {
842 let config = BackpressureConfig { max_queue_depth: 32, chunk_size: 8, flush_timeout: None };
843 let rt = RealtimePipeline::new(StreamingCollector::new(), config);
844 for i in 0..50_u32 {
845 rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
846 }
847 let cloud = rt.finish().unwrap();
848 assert_eq!(cloud.len(), 50);
849 }
850
851 #[test]
852 fn test_realtime_with_flush_timeout() {
853 let config = BackpressureConfig {
854 max_queue_depth: 64,
855 chunk_size: 100,
856 flush_timeout: Some(Duration::from_millis(5)),
857 };
858 let rt = RealtimePipeline::new(StreamingCollector::new(), config);
859 for i in 0..20_u32 {
860 rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
861 }
862 let cloud = rt.finish().unwrap();
864 assert_eq!(cloud.len(), 20);
865 }
866
867 #[test]
868 fn test_realtime_voxel_filter() {
869 let filter = StreamingVoxelFilter::new(StreamingVoxelFilterConfig { voxel_size: 1.0 });
870 let config = BackpressureConfig { max_queue_depth: 64, chunk_size: 16, flush_timeout: None };
871 let rt = RealtimePipeline::new(filter, config);
872 for i in 0..100_u32 {
873 rt.send(Point3f::new(i as f32 * 0.1, 0.0, 0.0)).unwrap();
874 }
875 let cloud = rt.finish().unwrap();
876 assert!(cloud.len() <= 10, "expected ≤10 voxels, got {}", cloud.len());
877 assert!(!cloud.is_empty());
878 }
879
880 #[test]
881 fn test_realtime_metrics_queued_count() {
882 let config = BackpressureConfig { max_queue_depth: 64, chunk_size: 32, flush_timeout: None };
883 let rt = RealtimePipeline::new(StreamingCollector::new(), config);
884 for i in 0..20_u32 {
885 rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
886 }
887 let m = rt.metrics();
888 assert_eq!(m.items_queued, 20);
889 assert_eq!(m.items_dropped, 0);
890 rt.finish().unwrap();
891 }
892
893 #[test]
894 fn test_realtime_try_send_accepts_when_space() {
895 let config = BackpressureConfig { max_queue_depth: 16, chunk_size: 8, flush_timeout: None };
896 let rt = RealtimePipeline::new(StreamingCollector::new(), config);
897 let accepted = rt.try_send(Point3f::new(1.0, 0.0, 0.0)).unwrap();
898 assert!(accepted, "should accept item when queue has space");
899 let m = rt.metrics();
900 assert_eq!(m.items_queued, 1);
901 assert_eq!(m.items_dropped, 0);
902 let cloud = rt.finish().unwrap();
903 assert_eq!(cloud.len(), 1);
904 }
905
906 #[test]
907 fn test_realtime_try_send_drops_when_full() {
908 use std::sync::{Condvar, Mutex};
909
910 struct LatchedCollector {
913 latch: Arc<(Mutex<bool>, Condvar)>,
914 inner: StreamingCollector,
915 blocked: bool,
916 }
917 impl StreamingPipeline<Point3f> for LatchedCollector {
918 type Output = PointCloud<Point3f>;
919 fn process_chunk(&mut self, chunk: &[Point3f]) -> Result<()> {
920 if !self.blocked {
921 self.blocked = true;
922 let (lock, cv) = &*self.latch;
923 let mut released = lock.lock().unwrap();
924 while !*released {
925 released = cv.wait(released).unwrap();
926 }
927 }
928 self.inner.process_chunk(chunk)
929 }
930 fn finalize(self) -> Result<PointCloud<Point3f>> {
931 self.inner.finalize()
932 }
933 }
934
935 let latch = Arc::new((Mutex::new(false), Condvar::new()));
936 let latch_release = Arc::clone(&latch);
937
938 let config = BackpressureConfig { max_queue_depth: 1, chunk_size: 1, flush_timeout: None };
940 let rt = RealtimePipeline::new(
941 LatchedCollector { latch, inner: StreamingCollector::new(), blocked: false },
942 config,
943 );
944
945 rt.send(Point3f::new(0.0, 0.0, 0.0)).unwrap();
947 std::thread::sleep(Duration::from_millis(20));
949
950 let mut accepted = 0usize;
952 let mut dropped = 0usize;
953 for i in 1..=8_u32 {
954 if rt.try_send(Point3f::new(i as f32, 0.0, 0.0)).unwrap() {
955 accepted += 1;
956 } else {
957 dropped += 1;
958 }
959 }
960 assert!(dropped > 0, "expected at least one drop with max_queue_depth=1");
961
962 let (lock, cv) = &*latch_release;
964 *lock.lock().unwrap() = true;
965 cv.notify_all();
966
967 let total_dropped = rt.metrics().items_dropped;
969 let cloud = rt.finish().unwrap();
970 assert_eq!(cloud.len(), 1 + accepted);
972 assert_eq!(total_dropped, dropped as u64);
973 }
974
975 #[test]
976 fn test_realtime_drop_without_finish() {
977 let config = BackpressureConfig { max_queue_depth: 16, chunk_size: 4, flush_timeout: None };
978 let rt = RealtimePipeline::new(StreamingCollector::new(), config);
979 for i in 0..10_u32 {
980 rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
981 }
982 drop(rt); }
984
985 #[test]
986 fn test_realtime_large_workload() {
987 let config = BackpressureConfig {
988 max_queue_depth: 512,
989 chunk_size: 128,
990 flush_timeout: None,
991 };
992 let rt = RealtimePipeline::new(StreamingCollector::new(), config);
993 const N: u32 = 10_000;
994 for i in 0..N {
995 rt.send(Point3f::new(i as f32, 0.0, 0.0)).unwrap();
996 }
997 let cloud = rt.finish().unwrap();
998 assert_eq!(cloud.len(), N as usize);
999 }
1000}