use std::{collections::HashSet, time::Duration};
use tokio::{
select,
sync::{broadcast, mpsc},
};
use crate::client::proto::common::microgrid::electrical_components::{
ElectricalComponentStateCode, ElectricalComponentTelemetry,
};
pub(crate) struct ComponentTelemetryTracker {
component_id: u64,
missing_data_tolerance: Duration,
component_data_rx: broadcast::Receiver<ElectricalComponentTelemetry>,
component_status_tx: mpsc::Sender<ComponentHealthStatus>,
healthy_state_codes: HashSet<ElectricalComponentStateCode>,
}
#[derive(PartialEq, Clone, Debug)]
pub(crate) enum ComponentHealthStatus {
Healthy(u64, ElectricalComponentTelemetry),
Unhealthy(u64, Option<ElectricalComponentTelemetry>),
}
impl ComponentTelemetryTracker {
pub(super) fn new(
component_id: u64,
missing_data_tolerance: Duration,
healthy_state_codes: HashSet<ElectricalComponentStateCode>,
component_data_rx: broadcast::Receiver<ElectricalComponentTelemetry>,
component_status_tx: mpsc::Sender<ComponentHealthStatus>,
) -> Self {
Self {
component_id,
missing_data_tolerance,
component_data_rx,
component_status_tx,
healthy_state_codes,
}
}
fn state_from_data(&self, data: ElectricalComponentTelemetry) -> ComponentHealthStatus {
for state in data.state_snapshots.iter() {
if !state.errors.is_empty() {
return ComponentHealthStatus::Unhealthy(self.component_id, Some(data));
}
for state in state.states.iter() {
let Ok(state) = ElectricalComponentStateCode::try_from(*state) else {
tracing::warn!(
"Component {} has an invalid state code: {}",
self.component_id,
state
);
return ComponentHealthStatus::Unhealthy(self.component_id, Some(data));
};
if !self.healthy_state_codes.contains(&state) {
return ComponentHealthStatus::Unhealthy(self.component_id, Some(data));
}
}
}
ComponentHealthStatus::Healthy(data.electrical_component_id, data)
}
pub async fn run(mut self) {
let mut interval = tokio::time::interval(self.missing_data_tolerance);
loop {
select! {
component_data = self.component_data_rx.recv() => {
match component_data {
Ok(data) => {
interval.reset();
let status = self.state_from_data(data);
if let Err(e) = self.component_status_tx.send(status).await {
tracing::error!("Failed to send component status: {}", e);
}
}
Err(broadcast::error::RecvError::Lagged(_)) => {
continue;
}
Err(broadcast::error::RecvError::Closed) => {
drop(self.component_status_tx);
break;
}
}
}
_ = interval.tick() => {
let status = ComponentHealthStatus::Unhealthy(self.component_id, None);
if let Err(e) = self.component_status_tx.send(status).await {
tracing::error!("Failed to send component status: {}", e);
}
}
}
}
}
}