frequenz_microgrid/microgrid/
battery_pool.rs1use tokio::sync::broadcast;
7
8use std::collections::{BTreeSet, HashSet};
9use std::time::Duration;
10
11use crate::{
12 Bounds, Error, Formula, LogicalMeterHandle, MicrogridClientHandle,
13 client::{
14 ElectricalComponentCategory,
15 proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
16 },
17 metric,
18 microgrid::{
19 battery_bounds_tracker::BatteryPoolBoundsTracker,
20 telemetry_tracker::battery_pool_telemetry_tracker::{
21 BatteryPoolSnapshot, BatteryPoolTelemetryTracker,
22 },
23 },
24 quantity::Power,
25};
26
27pub struct BatteryPool {
29 component_ids: Option<BTreeSet<u64>>,
30 client: MicrogridClientHandle,
31 logical_meter: LogicalMeterHandle,
32 snapshot_tx: Option<broadcast::WeakSender<BatteryPoolSnapshot>>,
33 bounds_tx: Option<broadcast::WeakSender<Vec<Bounds<Power>>>>,
34}
35
36impl BatteryPool {
37 pub(crate) fn try_new(
40 component_ids: Option<BTreeSet<u64>>,
41 client: MicrogridClientHandle,
42 logical_meter: LogicalMeterHandle,
43 ) -> Result<Self, Error> {
44 let this = Self {
45 component_ids,
46 client,
47 logical_meter,
48 snapshot_tx: None,
49 bounds_tx: None,
50 };
51 if let Some(ids) = &this.component_ids {
52 if ids.is_empty() {
53 let e = "component_ids cannot be an empty set".to_string();
54 tracing::error!("{e}");
55 return Err(Error::invalid_component(e));
56 }
57 if !ids.is_subset(&this.get_all_battery_ids()) {
59 let e = format!("All component_ids {:?} must be batteries.", ids);
60 tracing::error!("{e}");
61 return Err(Error::invalid_component(e));
62 }
63 }
64 Ok(this)
65 }
66
67 fn get_all_battery_ids(&self) -> BTreeSet<u64> {
68 self.logical_meter
69 .graph()
70 .components()
71 .filter(|c| c.category() == ElectricalComponentCategory::Battery)
72 .map(|c| c.id)
73 .collect()
74 }
75
76 pub(crate) fn get_battery_ids(&self) -> BTreeSet<u64> {
77 if let Some(ids) = &self.component_ids {
78 ids.clone()
79 } else {
80 self.get_all_battery_ids()
81 }
82 }
83
84 pub fn power(&mut self) -> Result<Formula<Power>, Error> {
86 self.logical_meter
87 .battery::<metric::AcPowerActive>(self.component_ids.clone())
88 }
89
90 pub fn power_bounds(&mut self) -> broadcast::Receiver<Vec<Bounds<Power>>> {
97 if let Some(tx) = self
98 .bounds_tx
99 .as_ref()
100 .and_then(broadcast::WeakSender::upgrade)
101 && tx.receiver_count() > 0
102 {
103 return tx.subscribe();
104 }
105 let snapshot_rx = self.telemetry_snapshots();
106 let (tx, rx) = broadcast::channel(100);
107 self.bounds_tx = Some(tx.downgrade());
108 let tracker = BatteryPoolBoundsTracker::<metric::AcPowerActive, metric::DcPower>::new(
109 snapshot_rx,
110 tx,
111 );
112 tokio::spawn(tracker.run());
113 rx
114 }
115
116 pub(crate) fn telemetry_snapshots(&mut self) -> broadcast::Receiver<BatteryPoolSnapshot> {
123 if let Some(tx) = self
124 .snapshot_tx
125 .as_ref()
126 .and_then(broadcast::WeakSender::upgrade)
127 && tx.receiver_count() > 0
128 {
129 return tx.subscribe();
130 }
131 let (tx, rx) = broadcast::channel(100);
132 self.snapshot_tx = Some(tx.downgrade());
133 let tracker = BatteryPoolTelemetryTracker::new(
134 self.get_battery_ids(),
135 Duration::from_secs(10),
136 HashSet::from([
137 ElectricalComponentStateCode::Ready,
138 ElectricalComponentStateCode::Standby,
139 ElectricalComponentStateCode::Charging,
140 ElectricalComponentStateCode::Discharging,
141 ElectricalComponentStateCode::RelayClosed,
142 ]),
143 self.client.clone(),
144 self.logical_meter.clone(),
145 tx,
146 );
147 tokio::spawn(tracker.run());
148 rx
149 }
150}