frequenz_microgrid/microgrid/telemetry_tracker/
inverter_battery_group_telemetry_tracker.rs1use std::{
11 collections::{HashMap, HashSet},
12 time::Duration,
13};
14
15use tokio::select;
16
17use crate::{
18 Error, MicrogridClientHandle,
19 client::proto::common::microgrid::electrical_components::{
20 ElectricalComponentStateCode, ElectricalComponentTelemetry,
21 },
22 microgrid::telemetry_tracker::battery_pool_telemetry_tracker::InverterBatteryGroup,
23};
24
25use super::component_telemetry_tracker::{ComponentHealthStatus, ComponentTelemetryTracker};
26
27#[derive(Clone)]
38pub(crate) struct InverterBatteryGroupTelemetryTracker {
39 inverter_battery_group: InverterBatteryGroup,
40 status_tx: tokio::sync::mpsc::Sender<(InverterBatteryGroup, InverterBatteryGroupStatus)>,
41 missing_data_tolerance: Duration,
42 healthy_state_codes: HashSet<ElectricalComponentStateCode>,
43 client: MicrogridClientHandle,
44}
45
46#[derive(Clone, Debug, PartialEq)]
55pub struct InverterBatteryGroupStatus {
56 pub healthy_inverters: HashMap<u64, ElectricalComponentTelemetry>,
57 pub healthy_batteries: HashMap<u64, ElectricalComponentTelemetry>,
58 pub unhealthy_inverters: HashMap<u64, Option<ElectricalComponentTelemetry>>,
59 pub unhealthy_batteries: HashMap<u64, Option<ElectricalComponentTelemetry>>,
60}
61
62impl InverterBatteryGroupTelemetryTracker {
63 pub(crate) fn new(
64 inverter_battery_group: InverterBatteryGroup,
65 missing_data_tolerance: Duration,
66 healthy_state_codes: HashSet<ElectricalComponentStateCode>,
67 client: MicrogridClientHandle,
68 status_tx: tokio::sync::mpsc::Sender<(InverterBatteryGroup, InverterBatteryGroupStatus)>,
69 ) -> Self {
70 Self {
71 inverter_battery_group,
72 status_tx,
73 missing_data_tolerance,
74 healthy_state_codes,
75 client,
76 }
77 }
78
79 pub async fn run(self) -> Result<(), Error> {
80 let mut healthy_inverters = HashMap::new();
81 let mut unhealthy_inverters = HashMap::new();
82 let mut healthy_batteries = HashMap::new();
83 let mut unhealthy_batteries = HashMap::new();
84
85 let (inverter_status_tx, mut inverter_status_rx) = tokio::sync::mpsc::channel(100);
86
87 for &inverter_id in &self.inverter_battery_group.inverter_ids {
88 let component_data_stream = self
89 .client
90 .receive_electrical_component_telemetry_stream(inverter_id)
91 .await?;
92 let tracker = ComponentTelemetryTracker::new(
93 inverter_id,
94 self.missing_data_tolerance,
95 self.healthy_state_codes.clone(),
96 component_data_stream,
97 inverter_status_tx.clone(),
98 );
99 tokio::spawn(async move {
101 tracker.run().await;
102 });
103 unhealthy_inverters.insert(inverter_id, None);
105 }
106
107 let (battery_status_tx, mut battery_status_rx) = tokio::sync::mpsc::channel(100);
108
109 for &battery_id in &self.inverter_battery_group.battery_ids {
110 let component_data_stream = self
111 .client
112 .receive_electrical_component_telemetry_stream(battery_id)
113 .await?;
114 let tracker = ComponentTelemetryTracker::new(
115 battery_id,
116 self.missing_data_tolerance,
117 self.healthy_state_codes.clone(),
118 component_data_stream,
119 battery_status_tx.clone(),
120 );
121 tokio::spawn(async move {
123 tracker.run().await;
124 });
125 unhealthy_batteries.insert(battery_id, None);
127 }
128
129 drop(inverter_status_tx);
133 drop(battery_status_tx);
134
135 loop {
136 select! {
137 inverter_status = inverter_status_rx.recv() => {
138 let Some(inverter_status) = inverter_status else {
139 let e = String::from("Inverter telemetry tracker stopped receiving status updates.");
140 tracing::error!("{}", e);
141 return Err(Error::component_data_error(e));
142 };
143 match inverter_status {
144 ComponentHealthStatus::Healthy(component_id, data) => {
145 healthy_inverters.insert(component_id, data);
146 unhealthy_inverters.remove(&component_id);
147 }
148 ComponentHealthStatus::Unhealthy(component_id, data) => {
149 unhealthy_inverters.insert(component_id, data);
150 healthy_inverters.remove(&component_id);
151 }
152 }
153 },
154 battery_status = battery_status_rx.recv() => {
155 let Some(battery_status) = battery_status else {
156 let e = String::from(
157 "Battery telemetry tracker stopped receiving status updates."
158 );
159 tracing::error!("{}", e);
160 return Err(Error::component_data_error(e));
161 };
162 match battery_status {
163 ComponentHealthStatus::Healthy(component_id, data) => {
164 healthy_batteries.insert(component_id, data);
165 unhealthy_batteries.remove(&component_id);
166 }
167 ComponentHealthStatus::Unhealthy(component_id, data) => {
168 unhealthy_batteries.insert(component_id, data);
169 healthy_batteries.remove(&component_id);
170 }
171 }
172 }
173 }
174 if let Err(e) = self
175 .status_tx
176 .send((
177 self.inverter_battery_group.clone(),
178 InverterBatteryGroupStatus {
179 healthy_inverters: healthy_inverters.clone(),
180 healthy_batteries: healthy_batteries.clone(),
181 unhealthy_inverters: unhealthy_inverters.clone(),
182 unhealthy_batteries: unhealthy_batteries.clone(),
183 },
184 ))
185 .await
186 {
187 tracing::error!("Failed to send inverter-battery group status: {}", e);
188 return Err(Error::component_data_error(format!(
189 "Failed to send inverter-battery group status: {}",
190 e
191 )));
192 }
193 }
194 }
195}