Skip to main content

frequenz_microgrid/microgrid/telemetry_tracker/
battery_pool_telemetry_tracker.rs

1// License: MIT
2// Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3
4//! A telemetry tracker for a pool of batteries and their associated inverters.
5
6use std::{
7    collections::{BTreeSet, HashMap, HashSet},
8    time::Duration,
9};
10
11use crate::{
12    Error, LogicalMeterHandle, MicrogridClientHandle,
13    client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
14    microgrid::telemetry_tracker::inverter_battery_group_telemetry_tracker::{
15        InverterBatteryGroupStatus, InverterBatteryGroupTelemetryTracker,
16    },
17};
18
19/// A set of inverters and batteries wired together in an `MxN` configuration:
20/// M inverters in parallel on the AC side, N batteries in parallel on the DC
21/// side, with the inverter side in series with the battery side.
22#[derive(Clone, Debug, Hash, PartialEq, Eq)]
23pub struct InverterBatteryGroup {
24    pub inverter_ids: BTreeSet<u64>,
25    pub battery_ids: BTreeSet<u64>,
26}
27
28impl InverterBatteryGroup {
29    pub(crate) fn new(inverter_ids: BTreeSet<u64>, battery_ids: BTreeSet<u64>) -> Self {
30        Self {
31            inverter_ids,
32            battery_ids,
33        }
34    }
35}
36
37#[derive(Clone, Debug, PartialEq)]
38pub struct BatteryPoolSnapshot(HashMap<InverterBatteryGroup, InverterBatteryGroupStatus>);
39
40impl BatteryPoolSnapshot {
41    pub fn groups(&self) -> &HashMap<InverterBatteryGroup, InverterBatteryGroupStatus> {
42        &self.0
43    }
44}
45
46/// A tracker that watches every inverter-battery group in the pool and emits
47/// a [`BatteryPoolSnapshot`] whenever any component's telemetry or health
48/// classification changes.
49#[derive(Clone)]
50pub struct BatteryPoolTelemetryTracker {
51    component_ids: BTreeSet<u64>,
52    component_pool_status_tx: tokio::sync::broadcast::Sender<BatteryPoolSnapshot>,
53    missing_data_tolerance: Duration,
54    healthy_state_codes: HashSet<ElectricalComponentStateCode>,
55    client: MicrogridClientHandle,
56    logical_meter: LogicalMeterHandle,
57}
58
59impl BatteryPoolTelemetryTracker {
60    pub(crate) fn new(
61        component_ids: BTreeSet<u64>,
62        missing_data_tolerance: Duration,
63        healthy_state_codes: HashSet<ElectricalComponentStateCode>,
64        client: MicrogridClientHandle,
65        logical_meter: LogicalMeterHandle,
66        component_pool_status_tx: tokio::sync::broadcast::Sender<BatteryPoolSnapshot>,
67    ) -> Self {
68        Self {
69            component_ids,
70            component_pool_status_tx,
71            missing_data_tolerance,
72            healthy_state_codes,
73            client,
74            logical_meter,
75        }
76    }
77
78    pub(crate) fn get_inverter_battery_groups(&self) -> Result<Vec<InverterBatteryGroup>, Error> {
79        if self.component_ids.is_empty() {
80            let e = "No component IDs provided for BatteryPoolTelemetryTracker".to_string();
81            tracing::error!("{}", e);
82            return Err(Error::component_data_error(e));
83        }
84        let mut unvisited_batteries = self.component_ids.clone();
85        let mut groups = Vec::new();
86
87        let graph = self.logical_meter.graph();
88
89        while let Some(battery_id) = unvisited_batteries.iter().next().cloned() {
90            let group_inverters = graph
91                .predecessors(battery_id)?
92                .filter(|c| c.category() == crate::client::ElectricalComponentCategory::Inverter)
93                .map(|c| c.id)
94                .collect::<BTreeSet<_>>();
95
96            if group_inverters.is_empty() {
97                let e = format!("Battery {} is not connected to any inverters.", battery_id);
98                tracing::error!("{}", e);
99                return Err(Error::component_data_error(e));
100            }
101
102            let mut group_batteries = BTreeSet::new();
103            for inverter_id in &group_inverters {
104                let connected_batteries = graph
105                    .successors(*inverter_id)?
106                    .map(|c| c.id)
107                    .collect::<BTreeSet<_>>();
108
109                group_batteries.extend(connected_batteries);
110            }
111
112            // Ensure that all group batteries are part of the request.
113            if !group_batteries.is_subset(&self.component_ids) {
114                let e = format!(
115                    concat!(
116                        "Inverters {:?} are connected to batteries {:?} which are not all in ",
117                        "the requested component IDs {:?}"
118                    ),
119                    group_inverters, group_batteries, self.component_ids
120                );
121
122                tracing::error!("{}", e);
123                return Err(Error::component_data_error(e));
124            }
125
126            // Remove the group batteries from the unvisited set
127            unvisited_batteries.retain(|b| !group_batteries.contains(b));
128
129            // Ensure that group batteries are only connect to group inverters
130            for battery_id in &group_batteries {
131                let connected_inverters = graph
132                    .predecessors(*battery_id)?
133                    .filter(|c| {
134                        c.category() == crate::client::ElectricalComponentCategory::Inverter
135                    })
136                    .map(|c| c.id)
137                    .collect::<BTreeSet<_>>();
138
139                if !connected_inverters.is_subset(&group_inverters) {
140                    let e = format!(
141                        "Battery {} is connected to inverters {:?} which are not all in the same group {:?}",
142                        battery_id, connected_inverters, group_inverters
143                    );
144                    tracing::error!("{}", e);
145                    return Err(Error::component_data_error(e));
146                }
147            }
148
149            groups.push(InverterBatteryGroup::new(group_inverters, group_batteries));
150        }
151
152        Ok(groups)
153    }
154
155    pub async fn run(self) -> Result<(), Error> {
156        let mut inverter_battery_group_data = HashMap::new();
157
158        let inverter_battery_group_ids = self.get_inverter_battery_groups()?;
159
160        let (component_status_tx, mut component_status_rx) = tokio::sync::mpsc::channel(100);
161        for inverter_battery_group in inverter_battery_group_ids {
162            let tracker = InverterBatteryGroupTelemetryTracker::new(
163                inverter_battery_group,
164                self.missing_data_tolerance,
165                self.healthy_state_codes.clone(),
166                self.client.clone(),
167                component_status_tx.clone(),
168            );
169            // Spawn a task for each group telemetry tracker
170            tokio::spawn(tracker.run());
171        }
172
173        // Drop the original sender so that the channel will close when all
174        // trackers finish.
175        drop(component_status_tx);
176
177        let mut interval = tokio::time::interval(Duration::from_millis(200));
178        let mut last_sent_status = None;
179
180        loop {
181            tokio::select! {
182                maybe_status = component_status_rx.recv() => {
183                    match maybe_status {
184                        Some((group_ids, status)) => {
185                            inverter_battery_group_data.insert(group_ids, status);
186                        }
187                        // Every group tracker has exited and dropped its sender,
188                        // so no further updates will arrive. The `interval.tick()`
189                        // arm never disables, so the `select!` `else` can never
190                        // run; break here instead.
191                        None => break,
192                    }
193                },
194                _ = interval.tick() => {
195                    if last_sent_status.as_ref() == Some(&inverter_battery_group_data) {
196                        continue; // Skip sending if the status hasn't changed
197                    }
198                    if let Err(e) = self.component_pool_status_tx.send(
199                        BatteryPoolSnapshot(inverter_battery_group_data.clone())
200                    )
201                    {
202                        tracing::error!("Failed to send pool snapshot: {}", e);
203                        break;
204                    }
205                    last_sent_status = Some(inverter_battery_group_data.clone());
206                },
207            }
208        }
209
210        let err = format!(
211            "BatteryPoolTelemetryTracker (component IDs {:?}) stopped receiving group telemetry updates.",
212            self.component_ids
213        );
214
215        tracing::error!("{}", err);
216
217        Err(Error::component_data_error(err))
218    }
219}
220
221#[cfg(test)]
222mod tests {
223    use std::collections::HashMap;
224
225    use chrono::TimeDelta;
226
227    use super::BatteryPoolSnapshot;
228    use crate::{
229        LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle,
230        client::{
231            proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
232            test_utils::{MockComponent, MockMicrogridApiClient},
233        },
234        microgrid::{
235            battery_pool::BatteryPool,
236            telemetry_tracker::{
237                battery_pool_telemetry_tracker::InverterBatteryGroup,
238                inverter_battery_group_telemetry_tracker::InverterBatteryGroupStatus,
239            },
240        },
241    };
242
243    impl BatteryPoolSnapshot {
244        pub(crate) fn from_groups(
245            groups: HashMap<InverterBatteryGroup, InverterBatteryGroupStatus>,
246        ) -> Self {
247            Self(groups)
248        }
249    }
250    async fn new_pool(graph: MockComponent) -> BatteryPool {
251        let api = MockMicrogridApiClient::new(graph);
252        let client = MicrogridClientHandle::new_from_client(api);
253        let lm = LogicalMeterHandle::try_new(
254            client.clone(),
255            LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()),
256        )
257        .await
258        .unwrap();
259        BatteryPool::try_new(None, client, lm).unwrap()
260    }
261
262    /// Drains `rx` for up to `steps` * 100ms of simulated time, returning the
263    /// last snapshot seen.
264    async fn last_snapshot(
265        rx: &mut tokio::sync::broadcast::Receiver<BatteryPoolSnapshot>,
266        steps: u32,
267    ) -> BatteryPoolSnapshot {
268        let mut last = None;
269        for _ in 0..steps {
270            tokio::time::advance(std::time::Duration::from_millis(100)).await;
271            while let Ok(snap) = rx.try_recv() {
272                last = Some(snap);
273            }
274        }
275        last.expect("no snapshot received")
276    }
277
278    #[tokio::test(start_paused = true)]
279    async fn single_group_reaches_healthy_state() {
280        // grid → meter → battery_inverter(3) → battery(4)
281        let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
282            MockComponent::meter(2).with_children(vec![
283                    MockComponent::battery_inverter(3)
284                        .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
285                        .with_children(vec![
286                            MockComponent::battery(4)
287                                .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
288                        ]),
289                ]),
290        ]))
291        .await;
292
293        let mut rx = pool.telemetry_snapshots();
294        let snap = last_snapshot(&mut rx, 10).await;
295
296        let groups = snap.groups();
297        assert_eq!(
298            groups.len(),
299            1,
300            "expected exactly one inverter-battery group"
301        );
302
303        let (group, status) = groups.iter().next().unwrap();
304        assert_eq!(group.inverter_ids, [3].into());
305        assert_eq!(group.battery_ids, [4].into());
306        assert!(status.healthy_inverters.contains_key(&3));
307        assert!(status.healthy_batteries.contains_key(&4));
308        assert!(status.unhealthy_inverters.is_empty());
309        assert!(status.unhealthy_batteries.is_empty());
310    }
311
312    #[tokio::test(start_paused = true)]
313    async fn two_disjoint_groups_both_appear_in_snapshot() {
314        // grid → meter → [battery_inverter(3)→battery(4), battery_inverter(5)→battery(6)]
315        let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
316            MockComponent::meter(2).with_children(vec![
317                    MockComponent::battery_inverter(3)
318                        .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
319                        .with_children(vec![
320                            MockComponent::battery(4)
321                                .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
322                        ]),
323                    MockComponent::battery_inverter(5)
324                        .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
325                        .with_children(vec![
326                            MockComponent::battery(6)
327                                .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
328                        ]),
329                ]),
330        ]))
331        .await;
332
333        let mut rx = pool.telemetry_snapshots();
334        let snap = last_snapshot(&mut rx, 10).await;
335
336        let groups = snap.groups();
337        assert_eq!(groups.len(), 2);
338
339        let all_inverters: std::collections::BTreeSet<u64> = groups
340            .keys()
341            .flat_map(|g| g.inverter_ids.iter().copied())
342            .collect();
343        let all_batteries: std::collections::BTreeSet<u64> = groups
344            .keys()
345            .flat_map(|g| g.battery_ids.iter().copied())
346            .collect();
347        assert_eq!(all_inverters, [3, 5].into());
348        assert_eq!(all_batteries, [4, 6].into());
349
350        for status in groups.values() {
351            assert!(status.unhealthy_inverters.is_empty());
352            assert!(status.unhealthy_batteries.is_empty());
353        }
354    }
355
356    #[tokio::test(start_paused = true)]
357    async fn calling_telemetry_snapshots_twice_reuses_sender() {
358        let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
359            MockComponent::meter(2).with_children(vec![
360                    MockComponent::battery_inverter(3)
361                        .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
362                        .with_children(vec![
363                            MockComponent::battery(4)
364                                .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
365                        ]),
366                ]),
367        ]))
368        .await;
369
370        let mut rx1 = pool.telemetry_snapshots();
371        let mut rx2 = pool.telemetry_snapshots();
372
373        // Advance so both receivers see at least one snapshot.
374        tokio::time::advance(std::time::Duration::from_millis(300)).await;
375
376        let snap1 = rx1.recv().await.unwrap();
377        let snap2 = rx2.recv().await.unwrap();
378        assert_eq!(
379            snap1, snap2,
380            "both subscriptions should observe the same snapshot"
381        );
382    }
383
384    #[tokio::test(start_paused = true)]
385    async fn components_become_unhealthy_when_data_stops() {
386        // Both components emit only a handful of samples and then go silent;
387        // the stream stays open so the client actor doesn't reconnect and
388        // resupply data.
389        let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
390            MockComponent::meter(2).with_children(vec![
391                    MockComponent::battery_inverter(3)
392                        .with_power(vec![0.0, 0.0, 0.0])
393                        .with_silence_after_metrics()
394                        .with_children(vec![
395                            MockComponent::battery(4)
396                                .with_power(vec![0.0, 0.0, 0.0])
397                                .with_silence_after_metrics(),
398                        ]),
399                ]),
400        ]))
401        .await;
402
403        let mut rx = pool.telemetry_snapshots();
404
405        // First: drain past the healthy phase and confirm components reach a
406        // healthy state (3 samples over ~600ms).
407        let healthy = last_snapshot(&mut rx, 10).await;
408        let (_, status) = healthy.groups().iter().next().unwrap();
409        assert!(
410            status.healthy_inverters.contains_key(&3) && status.healthy_batteries.contains_key(&4),
411            "expected components to go healthy after initial samples, got {:?}",
412            status
413        );
414
415        // Now advance well past the 10s missing-data tolerance — the
416        // component telemetry trackers should fire their interval and
417        // reclassify both components as unhealthy.
418        tokio::time::advance(std::time::Duration::from_secs(15)).await;
419        let unhealthy = last_snapshot(&mut rx, 5).await;
420
421        let (_, status) = unhealthy.groups().iter().next().unwrap();
422        assert!(
423            status.healthy_inverters.is_empty(),
424            "inverter should be unhealthy after data stops, got healthy set {:?}",
425            status.healthy_inverters.keys()
426        );
427        assert!(
428            status.healthy_batteries.is_empty(),
429            "battery should be unhealthy after data stops, got healthy set {:?}",
430            status.healthy_batteries.keys()
431        );
432        assert!(status.unhealthy_inverters.contains_key(&3));
433        assert!(status.unhealthy_batteries.contains_key(&4));
434    }
435
436    #[tokio::test(start_paused = true)]
437    async fn component_with_bad_state_is_unhealthy() {
438        // Battery reports an Error state — it must land in the unhealthy
439        // set even though samples keep arriving.
440        let mut pool = new_pool(MockComponent::grid(1).with_children(vec![
441            MockComponent::meter(2).with_children(vec![
442                    MockComponent::battery_inverter(3)
443                        .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
444                        .with_children(vec![
445                            MockComponent::battery(4)
446                                .with_power(vec![0.0, 0.0, 0.0, 0.0, 0.0, 0.0])
447                                .with_state(ElectricalComponentStateCode::Error),
448                        ]),
449                ]),
450        ]))
451        .await;
452
453        let mut rx = pool.telemetry_snapshots();
454        let snap = last_snapshot(&mut rx, 10).await;
455
456        let (_, status) = snap.groups().iter().next().unwrap();
457        assert!(
458            status.healthy_inverters.contains_key(&3),
459            "inverter with Ready state should be healthy"
460        );
461        assert!(
462            !status.healthy_batteries.contains_key(&4),
463            "battery with Error state should not be in healthy set"
464        );
465        assert!(
466            status.unhealthy_batteries.contains_key(&4),
467            "battery with Error state should be in unhealthy set, got {:?}",
468            status
469        );
470    }
471}