1use crate::data::point::{DataPoint, Point2D};
7use crate::error::{ChartError, ChartResult, DataError};
8use crate::memory::{ManagedSlidingWindow, MemoryStats};
9use heapless::Vec;
10
11#[derive(Debug, Clone, Copy)]
13pub struct StreamingConfig {
14 pub buffer_capacity: usize,
16 pub update_interval: u32,
18 pub auto_prune: bool,
20 pub max_data_age: u32,
22 pub auto_scale: bool,
24 pub memory_threshold: f32,
26}
27
28impl Default for StreamingConfig {
29 fn default() -> Self {
30 Self {
31 buffer_capacity: 100,
32 update_interval: 100, auto_prune: true,
34 max_data_age: 0,
35 auto_scale: true,
36 memory_threshold: 80.0,
37 }
38 }
39}
40
41pub struct UnifiedStreamingBuffer<const N: usize> {
44 buffer: ManagedSlidingWindow<Point2D, N>,
46 config: StreamingConfig,
48 last_update: u32,
50 bounds: Option<crate::data::bounds::DataBounds<f32, f32>>,
52 metrics: StreamingMetrics,
54}
55
56#[derive(Debug, Clone, Copy, Default)]
58pub struct StreamingMetrics {
59 pub total_points: u64,
61 pub dropped_points: u64,
63 pub pruned_points: u64,
65 pub avg_latency_us: u32,
67 pub peak_memory_usage: usize,
69 pub current_update_rate: f32,
71}
72
73impl<const N: usize> UnifiedStreamingBuffer<N> {
74 pub fn new() -> Self {
76 Self::with_config(StreamingConfig::default())
77 }
78
79 pub fn with_config(config: StreamingConfig) -> Self {
81 Self {
82 buffer: ManagedSlidingWindow::new(),
83 config,
84 last_update: 0,
85 bounds: None,
86 metrics: StreamingMetrics::default(),
87 }
88 }
89
90 pub fn push_with_timestamp(&mut self, point: Point2D, timestamp: u32) -> ChartResult<()> {
92 let start_time = self.get_current_time_us();
93
94 if self.config.auto_prune && self.config.max_data_age > 0 {
96 self.prune_old_data(timestamp)?;
97 }
98
99 if self.buffer.memory_stats().utilization_percent() > self.config.memory_threshold {
101 return Err(ChartError::DataError(DataError::BUFFER_FULL));
102 }
103
104 self.buffer.push(point);
106 self.metrics.total_points += 1;
107
108 if self.config.auto_scale {
110 self.update_bounds();
111 }
112
113 let end_time = self.get_current_time_us();
115 self.update_latency_metrics(end_time - start_time);
116 self.update_memory_metrics();
117
118 self.last_update = timestamp;
119 Ok(())
120 }
121
122 pub fn push(&mut self, point: Point2D) -> ChartResult<()> {
124 let timestamp = self.get_current_time_ms();
125 self.push_with_timestamp(point, timestamp)
126 }
127
128 pub fn data(&self) -> impl Iterator<Item = Point2D> + '_ {
130 self.buffer.iter()
131 }
132
133 pub fn len(&self) -> usize {
135 self.buffer.len()
136 }
137
138 pub fn is_empty(&self) -> bool {
140 self.buffer.is_empty()
141 }
142
143 pub fn clear(&mut self) {
145 self.buffer.clear();
146 self.bounds = None;
147 self.metrics = StreamingMetrics::default();
148 }
149
150 pub fn capacity(&self) -> usize {
152 N
153 }
154
155 pub fn memory_stats(&self) -> &MemoryStats {
157 self.buffer.memory_stats()
158 }
159
160 pub fn metrics(&self) -> &StreamingMetrics {
162 &self.metrics
163 }
164
165 pub fn bounds(&self) -> Option<crate::data::bounds::DataBounds<f32, f32>> {
167 self.bounds
168 }
169
170 pub fn update_config(&mut self, config: StreamingConfig) {
172 self.config = config;
173 }
174
175 pub fn config(&self) -> &StreamingConfig {
177 &self.config
178 }
179
180 fn prune_old_data(&mut self, _current_time: u32) -> ChartResult<()> {
182 Ok(())
189 }
190
191 fn update_bounds(&mut self) {
193 if let Ok(bounds) = crate::data::bounds::calculate_bounds(self.buffer.iter()) {
194 self.bounds = Some(bounds);
195 }
196 }
197
198 fn update_latency_metrics(&mut self, latency_us: u32) {
200 if self.metrics.avg_latency_us == 0 {
202 self.metrics.avg_latency_us = latency_us;
203 } else {
204 self.metrics.avg_latency_us = (self.metrics.avg_latency_us * 7 + latency_us) / 8;
205 }
206 }
207
208 fn update_memory_metrics(&mut self) {
210 let current_usage = self.buffer.memory_stats().used;
211 if current_usage > self.metrics.peak_memory_usage {
212 self.metrics.peak_memory_usage = current_usage;
213 }
214 }
215
216 fn get_current_time_ms(&self) -> u32 {
218 self.metrics.total_points as u32
221 }
222
223 fn get_current_time_us(&self) -> u32 {
225 self.get_current_time_ms() * 1000
227 }
228}
229
230impl<const N: usize> Default for UnifiedStreamingBuffer<N> {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236pub struct StreamingDataPipeline<T: Copy + Clone + DataPoint, const N: usize> {
238 sources: Vec<StreamingDataSource<N>, 8>,
240 animation: crate::animation::StreamingAnimator<T>,
242 config: PipelineConfig,
244 metrics: PipelineMetrics,
246}
247
248#[derive(Debug, Clone, Copy)]
250pub struct PipelineConfig {
251 pub max_sources: usize,
253 pub sync_mode: SyncMode,
255 pub error_recovery: ErrorRecovery,
257 pub monitoring_enabled: bool,
259}
260
261#[derive(Debug, Clone, Copy, PartialEq)]
263pub enum SyncMode {
264 Independent,
266 Synchronized,
268 FastestMaster,
270}
271
272#[derive(Debug, Clone, Copy, PartialEq)]
274pub enum ErrorRecovery {
275 StopOnError,
277 ContinueOnError,
279 RetryOnError,
281}
282
283#[derive(Debug, Clone, Copy, Default)]
285pub struct PipelineMetrics {
286 pub total_processed: u64,
288 pub sync_events: u64,
290 pub error_count: u64,
292 pub avg_pipeline_latency_us: u32,
294 pub throughput_pps: f32,
296}
297
298impl Default for PipelineConfig {
299 fn default() -> Self {
300 Self {
301 max_sources: 8,
302 sync_mode: SyncMode::Independent,
303 error_recovery: ErrorRecovery::ContinueOnError,
304 monitoring_enabled: true,
305 }
306 }
307}
308
309impl<T: Copy + Clone + DataPoint, const N: usize> StreamingDataPipeline<T, N> {
310 pub fn new(update_rate_hz: u32) -> Self {
312 Self::with_config(update_rate_hz, PipelineConfig::default())
313 }
314
315 pub fn with_config(_update_rate_hz: u32, config: PipelineConfig) -> Self {
317 Self {
318 sources: Vec::new(),
319 animation: crate::animation::StreamingAnimator::new(),
320 config,
321 metrics: PipelineMetrics::default(),
322 }
323 }
324
325 pub fn add_source(&mut self, source: StreamingDataSource<N>) -> ChartResult<usize> {
327 if self.sources.len() >= self.config.max_sources {
328 return Err(ChartError::DataError(DataError::BUFFER_FULL));
329 }
330
331 let index = self.sources.len();
332 self.sources
333 .push(source)
334 .map_err(|_| ChartError::DataError(DataError::BUFFER_FULL))?;
335
336 Ok(index)
337 }
338
339 pub fn update(&mut self, delta_time: crate::time::Milliseconds) -> ChartResult<bool> {
341 let start_time = self.get_current_time_us();
342 let mut updated = false;
343
344 if self.animation.update_with_delta(delta_time)? {
346 updated = true;
347 }
348
349 if self.config.monitoring_enabled {
351 let end_time = self.get_current_time_us();
352 self.update_pipeline_metrics(end_time - start_time);
353 }
354
355 Ok(updated)
356 }
357
358 pub fn current_data(&self) -> impl Iterator<Item = T> + '_ {
360 self.animation.current_data()
361 }
362
363 pub fn metrics(&self) -> &PipelineMetrics {
365 &self.metrics
366 }
367
368 pub fn source_count(&self) -> usize {
370 self.sources.len()
371 }
372
373 fn update_pipeline_metrics(&mut self, latency_us: u32) {
375 if self.metrics.avg_pipeline_latency_us == 0 {
377 self.metrics.avg_pipeline_latency_us = latency_us;
378 } else {
379 self.metrics.avg_pipeline_latency_us =
380 (self.metrics.avg_pipeline_latency_us * 7 + latency_us) / 8;
381 }
382 }
383
384 fn get_current_time_us(&self) -> u32 {
386 self.metrics.total_processed as u32
388 }
389}
390
391pub struct StreamingDataSource<const N: usize> {
393 buffer: UnifiedStreamingBuffer<N>,
395 id: u32,
397 config: SourceConfig,
399 state: SourceState,
401}
402
403#[derive(Debug, Clone, Copy)]
405pub struct SourceConfig {
406 pub update_interval: u32,
408 pub priority: u8,
410 pub validate_data: bool,
412 pub max_errors: u32,
414}
415
416#[derive(Debug, Clone, Copy, PartialEq)]
418pub enum SourceState {
419 Active,
421 Paused,
423 Error,
425 Disabled,
427}
428
429impl Default for SourceConfig {
430 fn default() -> Self {
431 Self {
432 update_interval: 100,
433 priority: 1,
434 validate_data: true,
435 max_errors: 5,
436 }
437 }
438}
439
440impl<const N: usize> StreamingDataSource<N> {
441 pub fn new(id: u32) -> Self {
443 Self::with_config(id, SourceConfig::default())
444 }
445
446 pub fn with_config(id: u32, config: SourceConfig) -> Self {
448 let streaming_config = StreamingConfig {
449 update_interval: config.update_interval,
450 ..Default::default()
451 };
452
453 Self {
454 buffer: UnifiedStreamingBuffer::with_config(streaming_config),
455 id,
456 config,
457 state: SourceState::Active,
458 }
459 }
460
461 pub fn update(&mut self, point: Point2D) -> ChartResult<()> {
463 if self.state != SourceState::Active {
464 return Err(ChartError::DataError(DataError::INVALID_DATA_POINT));
465 }
466
467 if self.config.validate_data && !self.is_valid_point(&point) {
469 return Err(ChartError::DataError(DataError::INVALID_DATA_POINT));
470 }
471
472 self.buffer.push(point)
473 }
474
475 pub fn data(&self) -> impl Iterator<Item = Point2D> + '_ {
477 self.buffer.data()
478 }
479
480 pub fn id(&self) -> u32 {
482 self.id
483 }
484
485 pub fn state(&self) -> SourceState {
487 self.state
488 }
489
490 pub fn set_state(&mut self, state: SourceState) {
492 self.state = state;
493 }
494
495 pub fn config(&self) -> &SourceConfig {
497 &self.config
498 }
499
500 pub fn metrics(&self) -> &StreamingMetrics {
502 self.buffer.metrics()
503 }
504
505 fn is_valid_point(&self, point: &Point2D) -> bool {
507 point.x.is_finite() && point.y.is_finite()
509 }
510}
511
512pub struct StreamingChartManager<const MAX_CHARTS: usize> {
514 charts: Vec<ChartInstance, MAX_CHARTS>,
516 config: ManagerConfig,
518 metrics: ManagerMetrics,
520 sync_state: SyncState,
522}
523
524#[derive(Debug, Clone, Copy)]
526pub struct ManagerConfig {
527 pub global_update_rate: u32,
529 pub enable_sync: bool,
531 pub memory_strategy: MemoryStrategy,
533 pub monitoring_level: MonitoringLevel,
535}
536
537#[derive(Debug, Clone, Copy, PartialEq)]
539pub enum MemoryStrategy {
540 Conservative,
542 Balanced,
544 Performance,
546}
547
548#[derive(Debug, Clone, Copy, PartialEq)]
550pub enum MonitoringLevel {
551 None,
553 Basic,
555 Detailed,
557}
558
559#[derive(Debug)]
561pub struct ChartInstance {
562 pub id: u32,
564 pub chart_type: ChartType,
566 pub pipeline_id: u32,
568 pub config: ChartInstanceConfig,
570}
571
572#[derive(Debug, Clone, Copy, PartialEq)]
574pub enum ChartType {
575 Line,
577 Bar,
579 Scatter,
581 Gauge,
583 Custom,
585}
586
587#[derive(Debug, Clone, Copy)]
589pub struct ChartInstanceConfig {
590 pub priority: u8,
592 pub animations_enabled: bool,
594 pub memory_limit_bytes: usize,
596}
597
598#[derive(Debug, Clone, Copy, Default)]
600pub struct ManagerMetrics {
601 pub total_charts: u32,
603 pub active_charts: u32,
605 pub total_updates: u64,
607 pub avg_update_latency_us: u32,
609 pub total_memory_usage: usize,
611}
612
613#[derive(Debug, Clone, Copy, Default)]
615pub struct SyncState {
616 pub last_sync_time: u32,
618 pub pending_sync_count: u32,
620 pub sync_drift_us: i32,
622}
623
624impl Default for ManagerConfig {
625 fn default() -> Self {
626 Self {
627 global_update_rate: 30,
628 enable_sync: true,
629 memory_strategy: MemoryStrategy::Balanced,
630 monitoring_level: MonitoringLevel::Basic,
631 }
632 }
633}
634
635impl Default for ChartInstanceConfig {
636 fn default() -> Self {
637 Self {
638 priority: 1,
639 animations_enabled: true,
640 memory_limit_bytes: 4096,
641 }
642 }
643}
644
645impl<const MAX_CHARTS: usize> StreamingChartManager<MAX_CHARTS> {
646 pub fn new() -> Self {
648 Self::with_config(ManagerConfig::default())
649 }
650
651 pub fn with_config(config: ManagerConfig) -> Self {
653 Self {
654 charts: Vec::new(),
655 config,
656 metrics: ManagerMetrics::default(),
657 sync_state: SyncState::default(),
658 }
659 }
660
661 pub fn add_chart(
663 &mut self,
664 chart_type: ChartType,
665 pipeline_id: u32,
666 config: ChartInstanceConfig,
667 ) -> ChartResult<u32> {
668 if self.charts.len() >= MAX_CHARTS {
669 return Err(ChartError::DataError(DataError::BUFFER_FULL));
670 }
671
672 let chart_id = self.metrics.total_charts;
673 let instance = ChartInstance {
674 id: chart_id,
675 chart_type,
676 pipeline_id,
677 config,
678 };
679
680 self.charts
681 .push(instance)
682 .map_err(|_| ChartError::DataError(DataError::BUFFER_FULL))?;
683
684 self.metrics.total_charts += 1;
685 self.metrics.active_charts += 1;
686
687 Ok(chart_id)
688 }
689
690 pub fn update(&mut self, delta_time: crate::time::Milliseconds) -> ChartResult<()> {
692 let start_time = self.get_current_time_us();
693
694 if self.config.enable_sync {
696 self.update_sync_state(delta_time)?;
697 }
698
699 self.metrics.total_updates += 1;
701
702 if self.config.monitoring_level != MonitoringLevel::None {
704 let end_time = self.get_current_time_us();
705 self.update_manager_metrics(end_time - start_time);
706 }
707
708 Ok(())
709 }
710
711 pub fn metrics(&self) -> &ManagerMetrics {
713 &self.metrics
714 }
715
716 pub fn active_chart_count(&self) -> usize {
718 self.metrics.active_charts as usize
719 }
720
721 pub fn sync_state(&self) -> &SyncState {
723 &self.sync_state
724 }
725
726 fn update_sync_state(&mut self, _delta_time: crate::time::Milliseconds) -> ChartResult<()> {
728 self.sync_state.last_sync_time = self.get_current_time_ms();
730 Ok(())
731 }
732
733 fn update_manager_metrics(&mut self, latency_us: u32) {
735 if self.metrics.avg_update_latency_us == 0 {
737 self.metrics.avg_update_latency_us = latency_us;
738 } else {
739 self.metrics.avg_update_latency_us =
740 (self.metrics.avg_update_latency_us * 7 + latency_us) / 8;
741 }
742 }
743
744 fn get_current_time_ms(&self) -> u32 {
746 self.metrics.total_updates as u32
747 }
748
749 fn get_current_time_us(&self) -> u32 {
751 self.get_current_time_ms() * 1000
752 }
753}
754
755impl<const MAX_CHARTS: usize> Default for StreamingChartManager<MAX_CHARTS> {
756 fn default() -> Self {
757 Self::new()
758 }
759}
760
761#[cfg(test)]
762mod tests {
763 use super::*;
764
765 #[test]
766 fn test_unified_streaming_buffer() {
767 let mut buffer: UnifiedStreamingBuffer<10> = UnifiedStreamingBuffer::new();
768 assert!(buffer.is_empty());
769 assert_eq!(buffer.capacity(), 10);
770
771 buffer.push(Point2D::new(1.0, 2.0)).unwrap();
773 buffer.push(Point2D::new(2.0, 3.0)).unwrap();
774
775 assert_eq!(buffer.len(), 2);
776 assert!(!buffer.is_empty());
777
778 let data: Vec<Point2D, 10> = buffer.data().collect();
780 assert_eq!(data.len(), 2);
781 }
782
783 #[test]
784 fn test_streaming_data_source() {
785 let mut source: StreamingDataSource<5> = StreamingDataSource::new(1);
786 assert_eq!(source.id(), 1);
787 assert_eq!(source.state(), SourceState::Active);
788
789 source.update(Point2D::new(1.0, 2.0)).unwrap();
791 assert_eq!(source.data().count(), 1);
792
793 source.set_state(SourceState::Paused);
795 assert_eq!(source.state(), SourceState::Paused);
796
797 let result = source.update(Point2D::new(2.0, 3.0));
799 assert!(result.is_err());
800 }
801
802 #[test]
803 fn test_streaming_chart_manager() {
804 let mut manager: StreamingChartManager<5> = StreamingChartManager::new();
805 assert_eq!(manager.active_chart_count(), 0);
806
807 let chart_id = manager
809 .add_chart(ChartType::Line, 1, ChartInstanceConfig::default())
810 .unwrap();
811
812 assert_eq!(chart_id, 0);
813 assert_eq!(manager.active_chart_count(), 1);
814
815 manager.update(16).unwrap(); assert!(manager.metrics().total_updates > 0);
818 }
819
820 #[test]
821 fn test_streaming_config() {
822 let config = StreamingConfig {
823 buffer_capacity: 50,
824 update_interval: 50,
825 auto_prune: false,
826 ..Default::default()
827 };
828
829 let buffer: UnifiedStreamingBuffer<50> = UnifiedStreamingBuffer::with_config(config);
830 assert_eq!(buffer.config().buffer_capacity, 50);
831 assert_eq!(buffer.config().update_interval, 50);
832 assert!(!buffer.config().auto_prune);
833 }
834
835 #[test]
836 fn test_performance_metrics() {
837 let mut buffer: UnifiedStreamingBuffer<10> = UnifiedStreamingBuffer::new();
838
839 for i in 0..5 {
841 buffer.push(Point2D::new(i as f32, (i * 2) as f32)).unwrap();
842 }
843
844 let metrics = buffer.metrics();
845 assert_eq!(metrics.total_points, 5);
846 assert_eq!(metrics.dropped_points, 0);
847 }
848}