frequenz_microgrid/microgrid/
pv_pool.rs1use tokio::sync::broadcast;
16
17use std::collections::{BTreeSet, HashSet};
18use std::time::Duration;
19
20use crate::{
21 Bounds, Error, Formula, LogicalMeterHandle, MicrogridClientHandle,
22 client::proto::common::microgrid::electrical_components::ElectricalComponentStateCode,
23 metric,
24 microgrid::{
25 pv_bounds_tracker::PvPoolBoundsTracker,
26 telemetry_tracker::pv_pool_telemetry_tracker::{PvPoolSnapshot, PvPoolTelemetryTracker},
27 },
28 quantity::Power,
29};
30
31pub struct PvPool {
75 component_ids: Option<BTreeSet<u64>>,
76 client: MicrogridClientHandle,
77 logical_meter: LogicalMeterHandle,
78 snapshot_tx: Option<broadcast::WeakSender<PvPoolSnapshot>>,
79 bounds_tx: Option<broadcast::WeakSender<Vec<Bounds<Power>>>>,
80}
81
82impl PvPool {
83 pub(crate) fn try_new(
90 component_ids: Option<BTreeSet<u64>>,
91 client: MicrogridClientHandle,
92 logical_meter: LogicalMeterHandle,
93 ) -> Result<Self, Error> {
94 let this = Self {
95 component_ids,
96 client,
97 logical_meter,
98 snapshot_tx: None,
99 bounds_tx: None,
100 };
101 if let Some(ids) = &this.component_ids {
102 if ids.is_empty() {
103 let e = "component_ids cannot be an empty set".to_string();
104 tracing::error!("{e}");
105 return Err(Error::invalid_component(e));
106 }
107 if !ids.is_subset(&this.get_all_pv_inverter_ids()) {
110 let e = format!("All component_ids {:?} must be PV inverters.", ids);
111 tracing::error!("{e}");
112 return Err(Error::invalid_component(e));
113 }
114 }
115 Ok(this)
116 }
117
118 fn get_all_pv_inverter_ids(&self) -> BTreeSet<u64> {
119 self.logical_meter
120 .graph()
121 .components()
122 .filter(|c| c.is_pv_inverter())
123 .map(|c| c.id)
124 .collect()
125 }
126
127 pub(crate) fn get_pv_inverter_ids(&self) -> BTreeSet<u64> {
128 if let Some(ids) = &self.component_ids {
129 ids.clone()
130 } else {
131 self.get_all_pv_inverter_ids()
132 }
133 }
134
135 pub fn power(&mut self) -> Result<Formula<Power>, Error> {
137 self.logical_meter
138 .pv::<metric::AcPowerActive>(self.component_ids.clone())
139 }
140
141 pub fn power_bounds(&mut self) -> broadcast::Receiver<Vec<Bounds<Power>>> {
148 if let Some(tx) = self
149 .bounds_tx
150 .as_ref()
151 .and_then(broadcast::WeakSender::upgrade)
152 && tx.receiver_count() > 0
153 {
154 return tx.subscribe();
155 }
156 let snapshot_rx = self.telemetry_snapshots();
157 let (tx, rx) = broadcast::channel(100);
158 self.bounds_tx = Some(tx.downgrade());
159 let tracker = PvPoolBoundsTracker::<metric::AcPowerActive>::new(snapshot_rx, tx);
160 tokio::spawn(tracker.run());
161 rx
162 }
163
164 pub fn telemetry_snapshots(&mut self) -> broadcast::Receiver<PvPoolSnapshot> {
171 if let Some(tx) = self
172 .snapshot_tx
173 .as_ref()
174 .and_then(broadcast::WeakSender::upgrade)
175 && tx.receiver_count() > 0
176 {
177 return tx.subscribe();
178 }
179 let (tx, rx) = broadcast::channel(100);
180 self.snapshot_tx = Some(tx.downgrade());
181 let tracker = PvPoolTelemetryTracker::new(
182 self.get_pv_inverter_ids(),
183 Duration::from_secs(10),
184 HashSet::from([
188 ElectricalComponentStateCode::Ready,
189 ElectricalComponentStateCode::Standby,
190 ElectricalComponentStateCode::Discharging,
191 ]),
192 self.client.clone(),
193 tx,
194 );
195 tokio::spawn(tracker.run());
196 rx
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use std::collections::BTreeSet;
203
204 use chrono::TimeDelta;
205
206 use super::PvPool;
207 use crate::{
208 LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle,
209 client::test_utils::{MockComponent, MockMicrogridApiClient},
210 };
211
212 async fn handles(graph: MockComponent) -> (MicrogridClientHandle, LogicalMeterHandle) {
214 let api = MockMicrogridApiClient::new(graph);
215 let client = MicrogridClientHandle::new_from_client(api);
216 let lm = LogicalMeterHandle::try_new(
217 client.clone(),
218 LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap()),
219 )
220 .await
221 .unwrap();
222 (client, lm)
223 }
224
225 fn graph() -> MockComponent {
228 MockComponent::grid(1).with_children(vec![MockComponent::meter(2).with_children(vec![
229 MockComponent::meter(3).with_children(vec![
230 MockComponent::pv_inverter(4),
231 MockComponent::pv_inverter(5),
232 ]),
233 MockComponent::meter(6).with_children(vec![
234 MockComponent::battery_inverter(7).with_children(vec![MockComponent::battery(8)]),
235 ]),
236 ])])
237 }
238
239 #[tokio::test]
240 async fn try_new_rejects_empty_component_ids() {
241 let (client, lm) = handles(graph()).await;
242 let err = PvPool::try_new(Some(BTreeSet::new()), client, lm)
243 .err()
244 .expect("empty component_ids should be rejected");
245 assert!(err.to_string().contains("empty"), "unexpected error: {err}");
246 }
247
248 #[tokio::test]
249 async fn try_new_rejects_non_pv_component_ids() {
250 let (client, lm) = handles(graph()).await;
251 let err = PvPool::try_new(Some([4, 7, 8].into()), client, lm)
253 .err()
254 .expect("non-PV component_ids should be rejected");
255 assert!(
256 err.to_string().contains("must be PV inverters"),
257 "unexpected error: {err}"
258 );
259 }
260
261 #[tokio::test]
262 async fn power_formula_for_explicit_pv_inverters() {
263 let (client, lm) = handles(graph()).await;
264 let mut pool = PvPool::try_new(Some([4, 5].into()), client, lm).unwrap();
265 let formula = pool.power().unwrap();
266 assert_eq!(
267 formula.to_string(),
268 "METRIC_AC_POWER_ACTIVE::(COALESCE(#3, COALESCE(#5, 0.0) + COALESCE(#4, 0.0)))"
269 );
270 }
271
272 #[tokio::test]
273 async fn power_formula_for_all_pv_inverters() {
274 let (client, lm) = handles(graph()).await;
275 let mut pool = PvPool::try_new(None, client, lm).unwrap();
276 let formula = pool.power().unwrap();
277 assert_eq!(
278 formula.to_string(),
279 "METRIC_AC_POWER_ACTIVE::(COALESCE(#3, COALESCE(#5, 0.0) + COALESCE(#4, 0.0)))"
280 );
281 }
282}