use crate::data::point::{DataPoint, Point2D};
use crate::error::{ChartError, ChartResult, DataError};
use crate::memory::{ManagedSlidingWindow, MemoryStats};
use heapless::Vec;
#[derive(Debug, Clone, Copy)]
pub struct StreamingConfig {
pub buffer_capacity: usize,
pub update_interval: u32,
pub auto_prune: bool,
pub max_data_age: u32,
pub auto_scale: bool,
pub memory_threshold: f32,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
buffer_capacity: 100,
update_interval: 100, auto_prune: true,
max_data_age: 0,
auto_scale: true,
memory_threshold: 80.0,
}
}
}
pub struct UnifiedStreamingBuffer<const N: usize> {
buffer: ManagedSlidingWindow<Point2D, N>,
config: StreamingConfig,
last_update: u32,
bounds: Option<crate::data::bounds::DataBounds<f32, f32>>,
metrics: StreamingMetrics,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct StreamingMetrics {
pub total_points: u64,
pub dropped_points: u64,
pub pruned_points: u64,
pub avg_latency_us: u32,
pub peak_memory_usage: usize,
pub current_update_rate: f32,
}
impl<const N: usize> UnifiedStreamingBuffer<N> {
pub fn new() -> Self {
Self::with_config(StreamingConfig::default())
}
pub fn with_config(config: StreamingConfig) -> Self {
Self {
buffer: ManagedSlidingWindow::new(),
config,
last_update: 0,
bounds: None,
metrics: StreamingMetrics::default(),
}
}
pub fn push_with_timestamp(&mut self, point: Point2D, timestamp: u32) -> ChartResult<()> {
let start_time = self.get_current_time_us();
if self.config.auto_prune && self.config.max_data_age > 0 {
self.prune_old_data(timestamp)?;
}
if self.buffer.memory_stats().utilization_percent() > self.config.memory_threshold {
return Err(ChartError::DataError(DataError::BUFFER_FULL));
}
self.buffer.push(point);
self.metrics.total_points += 1;
if self.config.auto_scale {
self.update_bounds();
}
let end_time = self.get_current_time_us();
self.update_latency_metrics(end_time - start_time);
self.update_memory_metrics();
self.last_update = timestamp;
Ok(())
}
pub fn push(&mut self, point: Point2D) -> ChartResult<()> {
let timestamp = self.get_current_time_ms();
self.push_with_timestamp(point, timestamp)
}
pub fn data(&self) -> impl Iterator<Item = Point2D> + '_ {
self.buffer.iter()
}
pub fn len(&self) -> usize {
self.buffer.len()
}
pub fn is_empty(&self) -> bool {
self.buffer.is_empty()
}
pub fn clear(&mut self) {
self.buffer.clear();
self.bounds = None;
self.metrics = StreamingMetrics::default();
}
pub fn capacity(&self) -> usize {
N
}
pub fn memory_stats(&self) -> &MemoryStats {
self.buffer.memory_stats()
}
pub fn metrics(&self) -> &StreamingMetrics {
&self.metrics
}
pub fn bounds(&self) -> Option<crate::data::bounds::DataBounds<f32, f32>> {
self.bounds
}
pub fn update_config(&mut self, config: StreamingConfig) {
self.config = config;
}
pub fn config(&self) -> &StreamingConfig {
&self.config
}
fn prune_old_data(&mut self, _current_time: u32) -> ChartResult<()> {
Ok(())
}
fn update_bounds(&mut self) {
if let Ok(bounds) = crate::data::bounds::calculate_bounds(self.buffer.iter()) {
self.bounds = Some(bounds);
}
}
fn update_latency_metrics(&mut self, latency_us: u32) {
if self.metrics.avg_latency_us == 0 {
self.metrics.avg_latency_us = latency_us;
} else {
self.metrics.avg_latency_us = (self.metrics.avg_latency_us * 7 + latency_us) / 8;
}
}
fn update_memory_metrics(&mut self) {
let current_usage = self.buffer.memory_stats().used;
if current_usage > self.metrics.peak_memory_usage {
self.metrics.peak_memory_usage = current_usage;
}
}
fn get_current_time_ms(&self) -> u32 {
self.metrics.total_points as u32
}
fn get_current_time_us(&self) -> u32 {
self.get_current_time_ms() * 1000
}
}
impl<const N: usize> Default for UnifiedStreamingBuffer<N> {
fn default() -> Self {
Self::new()
}
}
pub struct StreamingDataPipeline<T: Copy + Clone + DataPoint, const N: usize> {
sources: Vec<StreamingDataSource<N>, 8>,
animation: crate::animation::StreamingAnimator<T>,
config: PipelineConfig,
metrics: PipelineMetrics,
}
#[derive(Debug, Clone, Copy)]
pub struct PipelineConfig {
pub max_sources: usize,
pub sync_mode: SyncMode,
pub error_recovery: ErrorRecovery,
pub monitoring_enabled: bool,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SyncMode {
Independent,
Synchronized,
FastestMaster,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ErrorRecovery {
StopOnError,
ContinueOnError,
RetryOnError,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct PipelineMetrics {
pub total_processed: u64,
pub sync_events: u64,
pub error_count: u64,
pub avg_pipeline_latency_us: u32,
pub throughput_pps: f32,
}
impl Default for PipelineConfig {
fn default() -> Self {
Self {
max_sources: 8,
sync_mode: SyncMode::Independent,
error_recovery: ErrorRecovery::ContinueOnError,
monitoring_enabled: true,
}
}
}
impl<T: Copy + Clone + DataPoint, const N: usize> StreamingDataPipeline<T, N> {
pub fn new(update_rate_hz: u32) -> Self {
Self::with_config(update_rate_hz, PipelineConfig::default())
}
pub fn with_config(_update_rate_hz: u32, config: PipelineConfig) -> Self {
Self {
sources: Vec::new(),
animation: crate::animation::StreamingAnimator::new(),
config,
metrics: PipelineMetrics::default(),
}
}
pub fn add_source(&mut self, source: StreamingDataSource<N>) -> ChartResult<usize> {
if self.sources.len() >= self.config.max_sources {
return Err(ChartError::DataError(DataError::BUFFER_FULL));
}
let index = self.sources.len();
self.sources
.push(source)
.map_err(|_| ChartError::DataError(DataError::BUFFER_FULL))?;
Ok(index)
}
pub fn update(&mut self, delta_time: crate::time::Milliseconds) -> ChartResult<bool> {
let start_time = self.get_current_time_us();
let mut updated = false;
if self.animation.update_with_delta(delta_time)? {
updated = true;
}
if self.config.monitoring_enabled {
let end_time = self.get_current_time_us();
self.update_pipeline_metrics(end_time - start_time);
}
Ok(updated)
}
pub fn current_data(&self) -> impl Iterator<Item = T> + '_ {
self.animation.current_data()
}
pub fn metrics(&self) -> &PipelineMetrics {
&self.metrics
}
pub fn source_count(&self) -> usize {
self.sources.len()
}
fn update_pipeline_metrics(&mut self, latency_us: u32) {
if self.metrics.avg_pipeline_latency_us == 0 {
self.metrics.avg_pipeline_latency_us = latency_us;
} else {
self.metrics.avg_pipeline_latency_us =
(self.metrics.avg_pipeline_latency_us * 7 + latency_us) / 8;
}
}
fn get_current_time_us(&self) -> u32 {
self.metrics.total_processed as u32
}
}
pub struct StreamingDataSource<const N: usize> {
buffer: UnifiedStreamingBuffer<N>,
id: u32,
config: SourceConfig,
state: SourceState,
}
#[derive(Debug, Clone, Copy)]
pub struct SourceConfig {
pub update_interval: u32,
pub priority: u8,
pub validate_data: bool,
pub max_errors: u32,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SourceState {
Active,
Paused,
Error,
Disabled,
}
impl Default for SourceConfig {
fn default() -> Self {
Self {
update_interval: 100,
priority: 1,
validate_data: true,
max_errors: 5,
}
}
}
impl<const N: usize> StreamingDataSource<N> {
pub fn new(id: u32) -> Self {
Self::with_config(id, SourceConfig::default())
}
pub fn with_config(id: u32, config: SourceConfig) -> Self {
let streaming_config = StreamingConfig {
update_interval: config.update_interval,
..Default::default()
};
Self {
buffer: UnifiedStreamingBuffer::with_config(streaming_config),
id,
config,
state: SourceState::Active,
}
}
pub fn update(&mut self, point: Point2D) -> ChartResult<()> {
if self.state != SourceState::Active {
return Err(ChartError::DataError(DataError::INVALID_DATA_POINT));
}
if self.config.validate_data && !self.is_valid_point(&point) {
return Err(ChartError::DataError(DataError::INVALID_DATA_POINT));
}
self.buffer.push(point)
}
pub fn data(&self) -> impl Iterator<Item = Point2D> + '_ {
self.buffer.data()
}
pub fn id(&self) -> u32 {
self.id
}
pub fn state(&self) -> SourceState {
self.state
}
pub fn set_state(&mut self, state: SourceState) {
self.state = state;
}
pub fn config(&self) -> &SourceConfig {
&self.config
}
pub fn metrics(&self) -> &StreamingMetrics {
self.buffer.metrics()
}
fn is_valid_point(&self, point: &Point2D) -> bool {
point.x.is_finite() && point.y.is_finite()
}
}
pub struct StreamingChartManager<const MAX_CHARTS: usize> {
charts: Vec<ChartInstance, MAX_CHARTS>,
config: ManagerConfig,
metrics: ManagerMetrics,
sync_state: SyncState,
}
#[derive(Debug, Clone, Copy)]
pub struct ManagerConfig {
pub global_update_rate: u32,
pub enable_sync: bool,
pub memory_strategy: MemoryStrategy,
pub monitoring_level: MonitoringLevel,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum MemoryStrategy {
Conservative,
Balanced,
Performance,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum MonitoringLevel {
None,
Basic,
Detailed,
}
#[derive(Debug)]
pub struct ChartInstance {
pub id: u32,
pub chart_type: ChartType,
pub pipeline_id: u32,
pub config: ChartInstanceConfig,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum ChartType {
Line,
Bar,
Scatter,
Gauge,
Custom,
}
#[derive(Debug, Clone, Copy)]
pub struct ChartInstanceConfig {
pub priority: u8,
pub animations_enabled: bool,
pub memory_limit_bytes: usize,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct ManagerMetrics {
pub total_charts: u32,
pub active_charts: u32,
pub total_updates: u64,
pub avg_update_latency_us: u32,
pub total_memory_usage: usize,
}
#[derive(Debug, Clone, Copy, Default)]
pub struct SyncState {
pub last_sync_time: u32,
pub pending_sync_count: u32,
pub sync_drift_us: i32,
}
impl Default for ManagerConfig {
fn default() -> Self {
Self {
global_update_rate: 30,
enable_sync: true,
memory_strategy: MemoryStrategy::Balanced,
monitoring_level: MonitoringLevel::Basic,
}
}
}
impl Default for ChartInstanceConfig {
fn default() -> Self {
Self {
priority: 1,
animations_enabled: true,
memory_limit_bytes: 4096,
}
}
}
impl<const MAX_CHARTS: usize> StreamingChartManager<MAX_CHARTS> {
pub fn new() -> Self {
Self::with_config(ManagerConfig::default())
}
pub fn with_config(config: ManagerConfig) -> Self {
Self {
charts: Vec::new(),
config,
metrics: ManagerMetrics::default(),
sync_state: SyncState::default(),
}
}
pub fn add_chart(
&mut self,
chart_type: ChartType,
pipeline_id: u32,
config: ChartInstanceConfig,
) -> ChartResult<u32> {
if self.charts.len() >= MAX_CHARTS {
return Err(ChartError::DataError(DataError::BUFFER_FULL));
}
let chart_id = self.metrics.total_charts;
let instance = ChartInstance {
id: chart_id,
chart_type,
pipeline_id,
config,
};
self.charts
.push(instance)
.map_err(|_| ChartError::DataError(DataError::BUFFER_FULL))?;
self.metrics.total_charts += 1;
self.metrics.active_charts += 1;
Ok(chart_id)
}
pub fn update(&mut self, delta_time: crate::time::Milliseconds) -> ChartResult<()> {
let start_time = self.get_current_time_us();
if self.config.enable_sync {
self.update_sync_state(delta_time)?;
}
self.metrics.total_updates += 1;
if self.config.monitoring_level != MonitoringLevel::None {
let end_time = self.get_current_time_us();
self.update_manager_metrics(end_time - start_time);
}
Ok(())
}
pub fn metrics(&self) -> &ManagerMetrics {
&self.metrics
}
pub fn active_chart_count(&self) -> usize {
self.metrics.active_charts as usize
}
pub fn sync_state(&self) -> &SyncState {
&self.sync_state
}
fn update_sync_state(&mut self, _delta_time: crate::time::Milliseconds) -> ChartResult<()> {
self.sync_state.last_sync_time = self.get_current_time_ms();
Ok(())
}
fn update_manager_metrics(&mut self, latency_us: u32) {
if self.metrics.avg_update_latency_us == 0 {
self.metrics.avg_update_latency_us = latency_us;
} else {
self.metrics.avg_update_latency_us =
(self.metrics.avg_update_latency_us * 7 + latency_us) / 8;
}
}
fn get_current_time_ms(&self) -> u32 {
self.metrics.total_updates as u32
}
fn get_current_time_us(&self) -> u32 {
self.get_current_time_ms() * 1000
}
}
impl<const MAX_CHARTS: usize> Default for StreamingChartManager<MAX_CHARTS> {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_unified_streaming_buffer() {
let mut buffer: UnifiedStreamingBuffer<10> = UnifiedStreamingBuffer::new();
assert!(buffer.is_empty());
assert_eq!(buffer.capacity(), 10);
buffer.push(Point2D::new(1.0, 2.0)).unwrap();
buffer.push(Point2D::new(2.0, 3.0)).unwrap();
assert_eq!(buffer.len(), 2);
assert!(!buffer.is_empty());
let data: Vec<Point2D, 10> = buffer.data().collect();
assert_eq!(data.len(), 2);
}
#[test]
fn test_streaming_data_source() {
let mut source: StreamingDataSource<5> = StreamingDataSource::new(1);
assert_eq!(source.id(), 1);
assert_eq!(source.state(), SourceState::Active);
source.update(Point2D::new(1.0, 2.0)).unwrap();
assert_eq!(source.data().count(), 1);
source.set_state(SourceState::Paused);
assert_eq!(source.state(), SourceState::Paused);
let result = source.update(Point2D::new(2.0, 3.0));
assert!(result.is_err());
}
#[test]
fn test_streaming_chart_manager() {
let mut manager: StreamingChartManager<5> = StreamingChartManager::new();
assert_eq!(manager.active_chart_count(), 0);
let chart_id = manager
.add_chart(ChartType::Line, 1, ChartInstanceConfig::default())
.unwrap();
assert_eq!(chart_id, 0);
assert_eq!(manager.active_chart_count(), 1);
manager.update(16).unwrap(); assert!(manager.metrics().total_updates > 0);
}
#[test]
fn test_streaming_config() {
let config = StreamingConfig {
buffer_capacity: 50,
update_interval: 50,
auto_prune: false,
..Default::default()
};
let buffer: UnifiedStreamingBuffer<50> = UnifiedStreamingBuffer::with_config(config);
assert_eq!(buffer.config().buffer_capacity, 50);
assert_eq!(buffer.config().update_interval, 50);
assert!(!buffer.config().auto_prune);
}
#[test]
fn test_performance_metrics() {
let mut buffer: UnifiedStreamingBuffer<10> = UnifiedStreamingBuffer::new();
for i in 0..5 {
buffer.push(Point2D::new(i as f32, (i * 2) as f32)).unwrap();
}
let metrics = buffer.metrics();
assert_eq!(metrics.total_points, 5);
assert_eq!(metrics.dropped_points, 0);
}
}