1use std::{
7 collections::{BTreeSet, HashMap, HashSet},
8 time::Duration,
9};
10
11use crate::{
12 Error, LogicalMeterHandle, MicrogridClientHandle,
13 client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
14 microgrid::telemetry_tracker::inverter_battery_group_telemetry_tracker::{
15 InverterBatteryGroupStatus, InverterBatteryGroupTelemetryTracker,
16 },
17};
18
19#[derive(Clone, Debug, Hash, PartialEq, Eq)]
23pub struct InverterBatteryGroup {
24 pub inverter_ids: BTreeSet<u64>,
25 pub battery_ids: BTreeSet<u64>,
26}
27
28impl InverterBatteryGroup {
29 pub(crate) fn new(inverter_ids: BTreeSet<u64>, battery_ids: BTreeSet<u64>) -> Self {
30 Self {
31 inverter_ids,
32 battery_ids,
33 }
34 }
35}
36
37#[derive(Clone, Debug, PartialEq)]
38pub struct BatteryPoolSnapshot(HashMap<InverterBatteryGroup, InverterBatteryGroupStatus>);
39
40impl BatteryPoolSnapshot {
41 pub fn groups(&self) -> &HashMap<InverterBatteryGroup, InverterBatteryGroupStatus> {
42 &self.0
43 }
44}
45
46#[derive(Clone)]
50pub struct BatteryPoolTelemetryTracker {
51 component_ids: BTreeSet<u64>,
52 component_pool_status_tx: tokio::sync::broadcast::Sender<BatteryPoolSnapshot>,
53 missing_data_tolerance: Duration,
54 healthy_state_codes: HashSet<ElectricalComponentStateCode>,
55 client: MicrogridClientHandle,
56 logical_meter: LogicalMeterHandle,
57}
58
59impl BatteryPoolTelemetryTracker {
60 pub(crate) fn new(
61 component_ids: BTreeSet<u64>,
62 missing_data_tolerance: Duration,
63 healthy_state_codes: HashSet<ElectricalComponentStateCode>,
64 client: MicrogridClientHandle,
65 logical_meter: LogicalMeterHandle,
66 component_pool_status_tx: tokio::sync::broadcast::Sender<BatteryPoolSnapshot>,
67 ) -> Self {
68 Self {
69 component_ids,
70 component_pool_status_tx,
71 missing_data_tolerance,
72 healthy_state_codes,
73 client,
74 logical_meter,
75 }
76 }
77
78 pub(crate) fn get_inverter_battery_groups(&self) -> Result<Vec<InverterBatteryGroup>, Error> {
79 if self.component_ids.is_empty() {
80 let e = "No component IDs provided for BatteryPoolTelemetryTracker".to_string();
81 tracing::error!("{}", e);
82 return Err(Error::component_data_error(e));
83 }
84 let mut unvisited_batteries = self.component_ids.clone();
85 let mut groups = Vec::new();
86
87 let graph = self.logical_meter.graph();
88
89 while let Some(battery_id) = unvisited_batteries.iter().next().cloned() {
90 let group_inverters = graph
91 .predecessors(battery_id)?
92 .filter(|c| c.category() == crate::client::ElectricalComponentCategory::Inverter)
93 .map(|c| c.id)
94 .collect::<BTreeSet<_>>();
95
96 if group_inverters.is_empty() {
97 let e = format!("Battery {} is not connected to any inverters.", battery_id);
98 tracing::error!("{}", e);
99 return Err(Error::component_data_error(e));
100 }
101
102 let mut group_batteries = BTreeSet::new();
103 for inverter_id in &group_inverters {
104 let connected_batteries = graph
105 .successors(*inverter_id)?
106 .map(|c| c.id)
107 .collect::<BTreeSet<_>>();
108
109 group_batteries.extend(connected_batteries);
110 }
111
112 if !group_batteries.is_subset(&self.component_ids) {
114 let e = format!(
115 concat!(
116 "Inverters {:?} are connected to batteries {:?} which are not all in ",
117 "the requested component IDs {:?}"
118 ),
119 group_inverters, group_batteries, self.component_ids
120 );
121
122 tracing::error!("{}", e);
123 return Err(Error::component_data_error(e));
124 }
125
126 unvisited_batteries.retain(|b| !group_batteries.contains(b));
128
129 for battery_id in &group_batteries {
131 let connected_inverters = graph
132 .predecessors(*battery_id)?
133 .filter(|c| {
134 c.category() == crate::client::ElectricalComponentCategory::Inverter
135 })
136 .map(|c| c.id)
137 .collect::<BTreeSet<_>>();
138
139 if !connected_inverters.is_subset(&group_inverters) {
140 let e = format!(
141 "Battery {} is connected to inverters {:?} which are not all in the same group {:?}",
142 battery_id, connected_inverters, group_inverters
143 );
144 tracing::error!("{}", e);
145 return Err(Error::component_data_error(e));
146 }
147 }
148
149 groups.push(InverterBatteryGroup::new(group_inverters, group_batteries));
150 }
151
152 Ok(groups)
153 }
154
155 pub async fn run(self) -> Result<(), Error> {
156 let mut inverter_battery_group_data = HashMap::new();
157
158 let inverter_battery_group_ids = self.get_inverter_battery_groups()?;
159
160 let (component_status_tx, mut component_status_rx) = tokio::sync::mpsc::channel(100);
161 for inverter_battery_group in inverter_battery_group_ids {
162 let tracker = InverterBatteryGroupTelemetryTracker::new(
163 inverter_battery_group,
164 self.missing_data_tolerance,
165 self.healthy_state_codes.clone(),
166 self.client.clone(),
167 component_status_tx.clone(),
168 );
169 tokio::spawn(tracker.run());
171 }
172
173 drop(component_status_tx);
176
177 let mut interval = tokio::time::interval(Duration::from_millis(200));
178 let mut last_sent_status = None;
179
180 loop {
181 tokio::select! {
182 maybe_status = component_status_rx.recv() => {
183 match maybe_status {
184 Some((group_ids, status)) => {
185 inverter_battery_group_data.insert(group_ids, status);
186 }
187 None => break,
192 }
193 },
194 _ = interval.tick() => {
195 if last_sent_status.as_ref() == Some(&inverter_battery_group_data) {
196 continue; }
198 if let Err(e) = self.component_pool_status_tx.send(
199 BatteryPoolSnapshot(inverter_battery_group_data.clone())
200 )
201 {
202 tracing::error!("Failed to send pool snapshot: {}", e);
203 break;
204 }
205 last_sent_status = Some(inverter_battery_group_data.clone());
206 },
207 }
208 }
209
210 let err = format!(
211 "BatteryPoolTelemetryTracker (component IDs {:?}) stopped receiving group telemetry updates.",
212 self.component_ids
213 );
214
215 tracing::error!("{}", err);
216
217 Err(Error::component_data_error(err))
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use std::collections::HashMap;
224
225 use chrono::TimeDelta;
226
227 use super::BatteryPoolSnapshot;
228 use crate::{
229 LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle,
230 client::{
231 proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
232 test_utils::{MockComponent, MockMicrogridApiClient},
233 },
234 microgrid::{
235 battery_pool::BatteryPool,
236 telemetry_tracker::{
237 battery_pool_telemetry_tracker::InverterBatteryGroup,
238 inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus,
239 },
240 },
241 };
242
243 impl BatteryPoolSnapshot {
244 pub(crate) fn from_groups(
245 groups: HashMap<InverterBatteryGroup, InverterBatteryGroupStatus>,
246 ) -> Self {
247 Self(groups)
248 }
249 }
250 async fn new_pool(graph: MockComponent) -> BatteryPool {
251 let api = MockMicrogridApiClient::new(graph);
252 let client = MicrogridClientHandle::new_from_client(api);
253 let lm = LogicalMeterHandle::try_new(
254 client.clone(),
255 LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()),
256 )
257 .await
258 .unwrap();
259 BatteryPool::try_new(None, client, lm).unwrap()
260 }
261
262 async fn last_snapshot(
265 rx: &mut tokio::sync::broadcast::Receiver<BatteryPoolSnapshot>,
266 steps: u32,
267 ) -> BatteryPoolSnapshot {
268 let mut last = None;
269 for _ in 0..steps {
270 tokio::time::advance(std::time::Duration::from_millis(100)).await;
271 while let Ok(snap) = rx.try_recv() {
272 last = Some(snap);
273 }
274 }
275 last.expect("no snapshot received")
276 }
277
278 #[tokio::test(start_paused = true)]
279 async fn single_group_reaches_healthy_state() {
280 let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
282 MockComponent::meter(2).with_children(vec![
283 MockComponent::battery_inverter(3)
284 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
285 .with_children(vec![
286 MockComponent::battery(4)
287 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
288 ]),
289 ]),
290 ]))
291 .await;
292
293 let mut rx = pool.telemetry_snapshots();
294 let snap = last_snapshot(&mut rx, 10).await;
295
296 let groups = snap.groups();
297 assert_eq!(
298 groups.len(),
299 1,
300 "expected exactly one inverter-battery group"
301 );
302
303 let (group, status) = groups.iter().next().unwrap();
304 assert_eq!(group.inverter_ids, [3].into());
305 assert_eq!(group.battery_ids, [4].into());
306 assert!(status.healthy_inverters.contains_key(&3));
307 assert!(status.healthy_batteries.contains_key(&4));
308 assert!(status.unhealthy_inverters.is_empty());
309 assert!(status.unhealthy_batteries.is_empty());
310 }
311
312 #[tokio::test(start_paused = true)]
313 async fn two_disjoint_groups_both_appear_in_snapshot() {
314 let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
316 MockComponent::meter(2).with_children(vec![
317 MockComponent::battery_inverter(3)
318 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
319 .with_children(vec![
320 MockComponent::battery(4)
321 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
322 ]),
323 MockComponent::battery_inverter(5)
324 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
325 .with_children(vec![
326 MockComponent::battery(6)
327 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
328 ]),
329 ]),
330 ]))
331 .await;
332
333 let mut rx = pool.telemetry_snapshots();
334 let snap = last_snapshot(&mut rx, 10).await;
335
336 let groups = snap.groups();
337 assert_eq!(groups.len(), 2);
338
339 let all_inverters: std::collections::BTreeSet<u64> = groups
340 .keys()
341 .flat_map(|g| g.inverter_ids.iter().copied())
342 .collect();
343 let all_batteries: std::collections::BTreeSet<u64> = groups
344 .keys()
345 .flat_map(|g| g.battery_ids.iter().copied())
346 .collect();
347 assert_eq!(all_inverters, [3, 5].into());
348 assert_eq!(all_batteries, [4, 6].into());
349
350 for status in groups.values() {
351 assert!(status.unhealthy_inverters.is_empty());
352 assert!(status.unhealthy_batteries.is_empty());
353 }
354 }
355
356 #[tokio::test(start_paused = true)]
357 async fn calling_telemetry_snapshots_twice_reuses_sender() {
358 let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
359 MockComponent::meter(2).with_children(vec![
360 MockComponent::battery_inverter(3)
361 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
362 .with_children(vec![
363 MockComponent::battery(4)
364 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
365 ]),
366 ]),
367 ]))
368 .await;
369
370 let mut rx1 = pool.telemetry_snapshots();
371 let mut rx2 = pool.telemetry_snapshots();
372
373 tokio::time::advance(std::time::Duration::from_millis(300)).await;
375
376 let snap1 = rx1.recv().await.unwrap();
377 let snap2 = rx2.recv().await.unwrap();
378 assert_eq!(
379 snap1, snap2,
380 "both subscriptions should observe the same snapshot"
381 );
382 }
383
384 #[tokio::test(start_paused = true)]
385 async fn components_become_unhealthy_when_data_stops() {
386 let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
390 MockComponent::meter(2).with_children(vec![
391 MockComponent::battery_inverter(3)
392 .with_power(vec![0.0, 0.0, 0.0])
393 .with_silence_after_metrics()
394 .with_children(vec![
395 MockComponent::battery(4)
396 .with_power(vec![0.0, 0.0, 0.0])
397 .with_silence_after_metrics(),
398 ]),
399 ]),
400 ]))
401 .await;
402
403 let mut rx = pool.telemetry_snapshots();
404
405 let healthy = last_snapshot(&mut rx, 10).await;
408 let (_, status) = healthy.groups().iter().next().unwrap();
409 assert!(
410 status.healthy_inverters.contains_key(&3) && status.healthy_batteries.contains_key(&4),
411 "expected components to go healthy after initial samples, got {:?}",
412 status
413 );
414
415 tokio::time::advance(std::time::Duration::from_secs(15)).await;
419 let unhealthy = last_snapshot(&mut rx, 5).await;
420
421 let (_, status) = unhealthy.groups().iter().next().unwrap();
422 assert!(
423 status.healthy_inverters.is_empty(),
424 "inverter should be unhealthy after data stops, got healthy set {:?}",
425 status.healthy_inverters.keys()
426 );
427 assert!(
428 status.healthy_batteries.is_empty(),
429 "battery should be unhealthy after data stops, got healthy set {:?}",
430 status.healthy_batteries.keys()
431 );
432 assert!(status.unhealthy_inverters.contains_key(&3));
433 assert!(status.unhealthy_batteries.contains_key(&4));
434 }
435
436 #[tokio::test(start_paused = true)]
437 async fn component_with_bad_state_is_unhealthy() {
438 let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
441 MockComponent::meter(2).with_children(vec![
442 MockComponent::battery_inverter(3)
443 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
444 .with_children(vec![
445 MockComponent::battery(4)
446 .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
447 .with_state(ElectricalComponentStateCode::Error),
448 ]),
449 ]),
450 ]))
451 .await;
452
453 let mut rx = pool.telemetry_snapshots();
454 let snap = last_snapshot(&mut rx, 10).await;
455
456 let (_, status) = snap.groups().iter().next().unwrap();
457 assert!(
458 status.healthy_inverters.contains_key(&3),
459 "inverter with Ready state should be healthy"
460 );
461 assert!(
462 !status.healthy_batteries.contains_key(&4),
463 "battery with Error state should not be in healthy set"
464 );
465 assert!(
466 status.unhealthy_batteries.contains_key(&4),
467 "battery with Error state should be in unhealthy set, got {:?}",
468 status
469 );
470 }
471}