frequenz_microgrid/microgrid/telemetry_tracker/
pv_pool_telemetry_tracker.rs1use std::{
11 collections::{BTreeSet, HashMap, HashSet},
12 time::Duration,
13};
14
15use tokio::sync::{broadcast, mpsc};
16
17use crate::{
18 Error, MicrogridClientHandle,
19 client::proto::common::microgrid::electrical_components::{
20 ElectricalComponentStateCode, ElectricalComponentTelemetry,
21 },
22};
23
24use super::component_telemetry_tracker::{ComponentHealthStatus, ComponentTelemetryTracker};
25
26#[derive(Clone, Debug, PartialEq)]
35pub struct PvPoolSnapshot {
36 pub healthy_inverters: HashMap<u64, ElectricalComponentTelemetry>,
37 pub unhealthy_inverters: HashMap<u64, Option<ElectricalComponentTelemetry>>,
38}
39
40#[derive(Clone)]
44pub struct PvPoolTelemetryTracker {
45 component_ids: BTreeSet<u64>,
46 component_pool_status_tx: broadcast::Sender<PvPoolSnapshot>,
47 missing_data_tolerance: Duration,
48 healthy_state_codes: HashSet<ElectricalComponentStateCode>,
49 client: MicrogridClientHandle,
50}
51
52impl PvPoolTelemetryTracker {
53 pub(crate) fn new(
54 component_ids: BTreeSet<u64>,
55 missing_data_tolerance: Duration,
56 healthy_state_codes: HashSet<ElectricalComponentStateCode>,
57 client: MicrogridClientHandle,
58 component_pool_status_tx: broadcast::Sender<PvPoolSnapshot>,
59 ) -> Self {
60 Self {
61 component_ids,
62 component_pool_status_tx,
63 missing_data_tolerance,
64 healthy_state_codes,
65 client,
66 }
67 }
68
69 pub async fn run(self) -> Result<(), Error> {
70 if self.component_ids.is_empty() {
71 let e = "No component IDs provided for PvPoolTelemetryTracker".to_string();
72 tracing::error!("{}", e);
73 return Err(Error::component_data_error(e));
74 }
75
76 let mut healthy_inverters: HashMap<u64, ElectricalComponentTelemetry> = HashMap::new();
77 let mut unhealthy_inverters: HashMap<u64, Option<ElectricalComponentTelemetry>> =
78 HashMap::new();
79
80 let (status_tx, mut status_rx) = mpsc::channel(100);
81 for &inverter_id in &self.component_ids {
82 let component_data_stream = self
83 .client
84 .receive_electrical_component_telemetry_stream(inverter_id)
85 .await?;
86 let tracker = ComponentTelemetryTracker::new(
87 inverter_id,
88 self.missing_data_tolerance,
89 self.healthy_state_codes.clone(),
90 component_data_stream,
91 status_tx.clone(),
92 );
93 tokio::spawn(async move {
95 tracker.run().await;
96 });
97 unhealthy_inverters.insert(inverter_id, None);
99 }
100
101 drop(status_tx);
104
105 let mut interval = tokio::time::interval(Duration::from_millis(200));
106 let mut last_sent: Option<PvPoolSnapshot> = None;
107
108 loop {
109 tokio::select! {
110 maybe_status = status_rx.recv() => {
111 match maybe_status {
112 Some(ComponentHealthStatus::Healthy(id, data)) => {
113 healthy_inverters.insert(id, data);
114 unhealthy_inverters.remove(&id);
115 }
116 Some(ComponentHealthStatus::Unhealthy(id, data)) => {
117 unhealthy_inverters.insert(id, data);
118 healthy_inverters.remove(&id);
119 }
120 None => break,
125 }
126 },
127 _ = interval.tick() => {
128 let unchanged = last_sent.as_ref().is_some_and(|s| {
130 s.healthy_inverters == healthy_inverters
131 && s.unhealthy_inverters == unhealthy_inverters
132 });
133 if unchanged {
134 continue;
135 }
136 let snapshot = PvPoolSnapshot {
137 healthy_inverters: healthy_inverters.clone(),
138 unhealthy_inverters: unhealthy_inverters.clone(),
139 };
140 if let Err(e) = self.component_pool_status_tx.send(snapshot.clone()) {
141 tracing::error!("Failed to send PV pool snapshot: {}", e);
142 break;
143 }
144 last_sent = Some(snapshot);
145 },
146 }
147 }
148
149 let err = format!(
150 "PvPoolTelemetryTracker (component IDs {:?}) stopped receiving inverter telemetry updates.",
151 self.component_ids
152 );
153 tracing::error!("{}", err);
154 Err(Error::component_data_error(err))
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use chrono::TimeDelta;
161
162 use super::PvPoolSnapshot;
163 use crate::{
164 LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle,
165 client::{
166 proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
167 test_utils::{MockComponent, MockMicrogridApiClient},
168 },
169 microgrid::pv_pool::PvPool,
170 };
171
172 async fn new_pool(graph: MockComponent) -> PvPool {
173 let api = MockMicrogridApiClient::new(graph);
174 let client = MicrogridClientHandle::new_from_client(api);
175 let lm = LogicalMeterHandle::try_new(
176 client.clone(),
177 LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()),
178 )
179 .await
180 .unwrap();
181 PvPool::try_new(None, client, lm).unwrap()
182 }
183
184 async fn last_snapshot(
187 rx: &mut tokio::sync::broadcast::Receiver<PvPoolSnapshot>,
188 steps: u32,
189 ) -> PvPoolSnapshot {
190 let mut last = None;
191 for _ in 0..steps {
192 tokio::time::advance(std::time::Duration::from_millis(100)).await;
193 while let Ok(snap) = rx.try_recv() {
194 last = Some(snap);
195 }
196 }
197 last.expect("no snapshot received")
198 }
199
200 #[tokio::test(start_paused = true)]
201 async fn single_inverter_reaches_healthy_state() {
202 let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
204 MockComponent::meter(2).with_children(vec![
205 MockComponent::pv_inverter(3).with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
206 ]),
207 ]))
208 .await;
209
210 let mut rx = pool.telemetry_snapshots();
211 let snap = last_snapshot(&mut rx, 10).await;
212
213 assert!(snap.healthy_inverters.contains_key(&3));
214 assert!(snap.unhealthy_inverters.is_empty());
215 }
216
217 #[tokio::test(start_paused = true)]
218 async fn two_inverters_both_appear_in_snapshot() {
219 let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
221 MockComponent::meter(2).with_children(vec![
222 MockComponent::pv_inverter(3).with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
223 MockComponent::pv_inverter(4).with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
224 ]),
225 ]))
226 .await;
227
228 let mut rx = pool.telemetry_snapshots();
229 let snap = last_snapshot(&mut rx, 10).await;
230
231 assert!(snap.healthy_inverters.contains_key(&3));
232 assert!(snap.healthy_inverters.contains_key(&4));
233 assert!(snap.unhealthy_inverters.is_empty());
234 }
235
236 #[tokio::test(start_paused = true)]
237 async fn calling_telemetry_snapshots_twice_reuses_sender() {
238 let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
239 MockComponent::meter(2).with_children(vec![
240 MockComponent::pv_inverter(3).with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
241 ]),
242 ]))
243 .await;
244
245 let mut rx1 = pool.telemetry_snapshots();
246 let mut rx2 = pool.telemetry_snapshots();
247
248 tokio::time::advance(std::time::Duration::from_millis(300)).await;
250
251 let snap1 = rx1.recv().await.unwrap();
252 let snap2 = rx2.recv().await.unwrap();
253 assert_eq!(
254 snap1, snap2,
255 "both subscriptions should observe the same snapshot"
256 );
257 }
258
259 #[tokio::test(start_paused = true)]
260 async fn inverter_becomes_unhealthy_when_data_stops() {
261 let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
264 MockComponent::meter(2).with_children(vec![
265 MockComponent::pv_inverter(3)
266 .with_power(vec![0.0, 0.0, 0.0])
267 .with_silence_after_metrics(),
268 ]),
269 ]))
270 .await;
271
272 let mut rx = pool.telemetry_snapshots();
273
274 let healthy = last_snapshot(&mut rx, 10).await;
276 assert!(
277 healthy.healthy_inverters.contains_key(&3),
278 "expected inverter to go healthy after initial samples, got {:?}",
279 healthy
280 );
281
282 tokio::time::advance(std::time::Duration::from_secs(15)).await;
285 let unhealthy = last_snapshot(&mut rx, 5).await;
286
287 assert!(
288 unhealthy.healthy_inverters.is_empty(),
289 "inverter should be unhealthy after data stops, got healthy set {:?}",
290 unhealthy.healthy_inverters.keys()
291 );
292 assert!(unhealthy.unhealthy_inverters.contains_key(&3));
293 }
294
295 #[tokio::test(start_paused = true)]
296 async fn inverter_with_bad_state_is_unhealthy() {
297 let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
300 MockComponent::meter(2).with_children(vec![
301 MockComponent::pv_inverter(3)
302 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
303 .with_state(ElectricalComponentStateCode::Error),
304 ]),
305 ]))
306 .await;
307
308 let mut rx = pool.telemetry_snapshots();
309 let snap = last_snapshot(&mut rx, 10).await;
310
311 assert!(
312 !snap.healthy_inverters.contains_key(&3),
313 "inverter with Error state should not be in healthy set"
314 );
315 assert!(
316 snap.unhealthy_inverters.contains_key(&3),
317 "inverter with Error state should be in unhealthy set, got {:?}",
318 snap
319 );
320 }
321}