Skip to main content

frequenz_microgrid/microgrid/telemetry_tracker/
inverter_battery_group_telemetry_tracker.rs

1// License: MIT
2// Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3
4//! A telemetry tracker for an inverter-battery group in the microgrid, which
5//! consists of a set of inverters and their associated batteries, connected
6//! in MxN configuration. Emits snapshots that partition the group's
7//! components into healthy and unhealthy sets, each annotated with the
8//! latest telemetry sample.
9
10use std::{
11    collections::{HashMap, HashSet},
12    time::Duration,
13};
14
15use tokio::select;
16
17use crate::{
18    Error, MicrogridClientHandle,
19    client::proto::common::microgrid::electrical_components::{
20        ElectricalComponentStateCode, ElectricalComponentTelemetry,
21    },
22    microgrid::telemetry_tracker::battery_pool_telemetry_tracker::InverterBatteryGroup,
23};
24
25use super::component_telemetry_tracker::{ComponentHealthStatus, ComponentTelemetryTracker};
26
27/// A telemetry tracker for an inverter-battery group, which consists of a set
28/// of inverters and their associated batteries, connected in MxN
29/// configuration.
30///
31/// On every change, the tracker emits an [`InverterBatteryGroupStatus`] which
32/// partitions the group's components into healthy and unhealthy sets and
33/// carries the latest [`ElectricalComponentTelemetry`] sample seen for each
34/// component. Downstream consumers (e.g. the bounds tracker) can therefore
35/// read both the health state and the most recent metric samples from a
36/// single subscription without re-subscribing to the telemetry streams.
37#[derive(Clone)]
38pub(crate) struct InverterBatteryGroupTelemetryTracker {
39    inverter_battery_group: InverterBatteryGroup,
40    status_tx: tokio::sync::mpsc::Sender<(InverterBatteryGroup, InverterBatteryGroupStatus)>,
41    missing_data_tolerance: Duration,
42    healthy_state_codes: HashSet<ElectricalComponentStateCode>,
43    client: MicrogridClientHandle,
44}
45
46/// A snapshot of an inverter-battery group's components, partitioned by health
47/// status and annotated with the latest telemetry sample for each component.
48///
49/// The `healthy_*` maps hold the most recent [`ElectricalComponentTelemetry`]
50/// observed for each healthy component. The `unhealthy_*` maps hold the last
51/// telemetry observed before the component became unhealthy, or `None` if no
52/// sample has been received yet. Consumers can use the telemetry (including
53/// per-metric bounds) directly without subscribing to the raw streams again.
54#[derive(Clone, Debug, PartialEq)]
55pub struct InverterBatteryGroupStatus {
56    pub healthy_inverters: HashMap<u64, ElectricalComponentTelemetry>,
57    pub healthy_batteries: HashMap<u64, ElectricalComponentTelemetry>,
58    pub unhealthy_inverters: HashMap<u64, Option<ElectricalComponentTelemetry>>,
59    pub unhealthy_batteries: HashMap<u64, Option<ElectricalComponentTelemetry>>,
60}
61
62impl InverterBatteryGroupTelemetryTracker {
63    pub(crate) fn new(
64        inverter_battery_group: InverterBatteryGroup,
65        missing_data_tolerance: Duration,
66        healthy_state_codes: HashSet<ElectricalComponentStateCode>,
67        client: MicrogridClientHandle,
68        status_tx: tokio::sync::mpsc::Sender<(InverterBatteryGroup, InverterBatteryGroupStatus)>,
69    ) -> Self {
70        Self {
71            inverter_battery_group,
72            status_tx,
73            missing_data_tolerance,
74            healthy_state_codes,
75            client,
76        }
77    }
78
79    pub async fn run(self) -> Result<(), Error> {
80        let mut healthy_inverters = HashMap::new();
81        let mut unhealthy_inverters = HashMap::new();
82        let mut healthy_batteries = HashMap::new();
83        let mut unhealthy_batteries = HashMap::new();
84
85        let (inverter_status_tx, mut inverter_status_rx) = tokio::sync::mpsc::channel(100);
86
87        for &inverter_id in &self.inverter_battery_group.inverter_ids {
88            let component_data_stream = self
89                .client
90                .receive_electrical_component_telemetry_stream(inverter_id)
91                .await?;
92            let tracker = ComponentTelemetryTracker::new(
93                inverter_id,
94                self.missing_data_tolerance,
95                self.healthy_state_codes.clone(),
96                component_data_stream,
97                inverter_status_tx.clone(),
98            );
99            // Spawn a task for each component telemetry tracker
100            tokio::spawn(async move {
101                tracker.run().await;
102            });
103            // Initially mark the component as unhealthy until we see data.
104            unhealthy_inverters.insert(inverter_id, None);
105        }
106
107        let (battery_status_tx, mut battery_status_rx) = tokio::sync::mpsc::channel(100);
108
109        for &battery_id in &self.inverter_battery_group.battery_ids {
110            let component_data_stream = self
111                .client
112                .receive_electrical_component_telemetry_stream(battery_id)
113                .await?;
114            let tracker = ComponentTelemetryTracker::new(
115                battery_id,
116                self.missing_data_tolerance,
117                self.healthy_state_codes.clone(),
118                component_data_stream,
119                battery_status_tx.clone(),
120            );
121            // Spawn a task for each component telemetry tracker
122            tokio::spawn(async move {
123                tracker.run().await;
124            });
125            // Initially mark the component as unhealthy until we see data.
126            unhealthy_batteries.insert(battery_id, None);
127        }
128
129        // Drop the original senders in the main task to allow the component
130        // trackers to close the channels when they finish, which will signal
131        // the main loop to stop.
132        drop(inverter_status_tx);
133        drop(battery_status_tx);
134
135        loop {
136            select! {
137                inverter_status = inverter_status_rx.recv() => {
138                    let Some(inverter_status) = inverter_status else {
139                        let e = String::from("Inverter telemetry tracker stopped receiving status updates.");
140                        tracing::error!("{}", e);
141                        return Err(Error::component_data_error(e));
142                    };
143                    match inverter_status {
144                        ComponentHealthStatus::Healthy(component_id, data) => {
145                            healthy_inverters.insert(component_id, data);
146                            unhealthy_inverters.remove(&component_id);
147                        }
148                        ComponentHealthStatus::Unhealthy(component_id, data) => {
149                            unhealthy_inverters.insert(component_id, data);
150                            healthy_inverters.remove(&component_id);
151                        }
152                    }
153                },
154                battery_status = battery_status_rx.recv() => {
155                    let Some(battery_status) =  battery_status  else {
156                        let e = String::from(
157                            "Battery telemetry tracker stopped receiving status updates."
158                        );
159                        tracing::error!("{}", e);
160                        return Err(Error::component_data_error(e));
161                    };
162                    match battery_status {
163                        ComponentHealthStatus::Healthy(component_id, data) => {
164                            healthy_batteries.insert(component_id, data);
165                            unhealthy_batteries.remove(&component_id);
166                        }
167                        ComponentHealthStatus::Unhealthy(component_id, data) => {
168                            unhealthy_batteries.insert(component_id, data);
169                            healthy_batteries.remove(&component_id);
170                        }
171                    }
172                }
173            }
174            if let Err(e) = self
175                .status_tx
176                .send((
177                    self.inverter_battery_group.clone(),
178                    InverterBatteryGroupStatus {
179                        healthy_inverters: healthy_inverters.clone(),
180                        healthy_batteries: healthy_batteries.clone(),
181                        unhealthy_inverters: unhealthy_inverters.clone(),
182                        unhealthy_batteries: unhealthy_batteries.clone(),
183                    },
184                ))
185                .await
186            {
187                tracing::error!("Failed to send inverter-battery group status: {}", e);
188                return Err(Error::component_data_error(format!(
189                    "Failed to send inverter-battery group status: {}",
190                    e
191                )));
192            }
193        }
194    }
195}