Skip to main content

frequenz_microgrid/microgrid/
battery_pool.rs

1// License: MIT
2// Copyright © 2026 Frequenz Energy-as-a-Service GmbH
3
4//! Representation of a pool of batteries in the microgrid.
5
6use tokio::sync::broadcast;
7
8use std::collections::{BTreeSet, HashSet};
9use std::time::Duration;
10
11use crate::{
12    Bounds, Error, Formula, LogicalMeterHandle, MicrogridClientHandle,
13    client::{
14        ElectricalComponentCategory,
15        proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
16    },
17    metric,
18    microgrid::{
19        battery_bounds_tracker::BatteryPoolBoundsTracker,
20        telemetry_tracker::battery_pool_telemetry_tracker::{
21            BatteryPoolSnapshot, BatteryPoolTelemetryTracker,
22        },
23    },
24    quantity::Power,
25};
26
27/// An interface for abstracting over a pool of batteries in the microgrid.
28pub struct BatteryPool {
29    component_ids: Option<BTreeSet<u64>>,
30    client: MicrogridClientHandle,
31    logical_meter: LogicalMeterHandle,
32    snapshot_tx: Option<broadcast::WeakSender<BatteryPoolSnapshot>>,
33    bounds_tx: Option<broadcast::WeakSender<Vec<Bounds<Power>>>>,
34}
35
36impl BatteryPool {
37    /// Creates a new `BatteryPool` instance with the given component IDs,
38    /// client and logical meter handles.
39    pub(crate) fn try_new(
40        component_ids: Option<BTreeSet<u64>>,
41        client: MicrogridClientHandle,
42        logical_meter: LogicalMeterHandle,
43    ) -> Result<Self, Error> {
44        let this = Self {
45            component_ids,
46            client,
47            logical_meter,
48            snapshot_tx: None,
49            bounds_tx: None,
50        };
51        if let Some(ids) = &this.component_ids {
52            if ids.is_empty() {
53                let e = "component_ids cannot be an empty set".to_string();
54                tracing::error!("{e}");
55                return Err(Error::invalid_component(e));
56            }
57            // Validate that all provided IDs correspond to batteries in the graph.
58            if !ids.is_subset(&this.get_all_battery_ids()) {
59                let e = format!("All component_ids {:?} must be batteries.", ids);
60                tracing::error!("{e}");
61                return Err(Error::invalid_component(e));
62            }
63        }
64        Ok(this)
65    }
66
67    fn get_all_battery_ids(&self) -> BTreeSet<u64> {
68        self.logical_meter
69            .graph()
70            .components()
71            .filter(|c| c.category() == ElectricalComponentCategory::Battery)
72            .map(|c| c.id)
73            .collect()
74    }
75
76    pub(crate) fn get_battery_ids(&self) -> BTreeSet<u64> {
77        if let Some(ids) = &self.component_ids {
78            ids.clone()
79        } else {
80            self.get_all_battery_ids()
81        }
82    }
83
84    /// Returns a formula for the active power of the battery pool.
85    pub fn power(&mut self) -> Result<Formula<Power>, Error> {
86        self.logical_meter
87            .battery::<metric::AcPowerActive>(self.component_ids.clone())
88    }
89
90    /// Returns a receiver for the aggregated active-power bounds of the pool,
91    /// updated on each snapshot.
92    ///
93    /// Reuses the running bounds tracker if one exists and still has active
94    /// receivers; otherwise starts a new one (which also starts or reuses the
95    /// underlying telemetry tracker).
96    pub fn power_bounds(&mut self) -> broadcast::Receiver<Vec<Bounds<Power>>> {
97        if let Some(tx) = self
98            .bounds_tx
99            .as_ref()
100            .and_then(broadcast::WeakSender::upgrade)
101            && tx.receiver_count() > 0
102        {
103            return tx.subscribe();
104        }
105        let snapshot_rx = self.telemetry_snapshots();
106        let (tx, rx) = broadcast::channel(100);
107        self.bounds_tx = Some(tx.downgrade());
108        let tracker = BatteryPoolBoundsTracker::<metric::AcPowerActive, metric::DcPower>::new(
109            snapshot_rx,
110            tx,
111        );
112        tokio::spawn(tracker.run());
113        rx
114    }
115
116    /// Returns a receiver for a stream of [`BatteryPoolSnapshot`] values,
117    /// each reflecting the latest component telemetry partitioned into
118    /// healthy and unhealthy sets.
119    ///
120    /// Reuses the running tracker if one exists and still has active receivers
121    /// (including any held by a bounds tracker); otherwise starts a new one.
122    pub(crate) fn telemetry_snapshots(&mut self) -> broadcast::Receiver<BatteryPoolSnapshot> {
123        if let Some(tx) = self
124            .snapshot_tx
125            .as_ref()
126            .and_then(broadcast::WeakSender::upgrade)
127            && tx.receiver_count() > 0
128        {
129            return tx.subscribe();
130        }
131        let (tx, rx) = broadcast::channel(100);
132        self.snapshot_tx = Some(tx.downgrade());
133        let tracker = BatteryPoolTelemetryTracker::new(
134            self.get_battery_ids(),
135            Duration::from_secs(10),
136            HashSet::from([
137                ElectricalComponentStateCode::Ready,
138                ElectricalComponentStateCode::Standby,
139                ElectricalComponentStateCode::Charging,
140                ElectricalComponentStateCode::Discharging,
141                ElectricalComponentStateCode::RelayClosed,
142            ]),
143            self.client.clone(),
144            self.logical_meter.clone(),
145            tx,
146        );
147        tokio::spawn(tracker.run());
148        rx
149    }
150}