1use crate::logical_meter::formula::Formula;
5use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider;
6use crate::{
7 client::MicrogridClientHandle,
8 error::Error,
9 proto::common::microgrid::electrical_components::{
10 ElectricalComponent, ElectricalComponentConnection,
11 },
12};
13use frequenz_microgrid_component_graph::{self, ComponentGraph};
14use std::collections::BTreeSet;
15use tokio::sync::mpsc;
16
17use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor};
18
19#[derive(Clone)]
21pub struct LogicalMeterHandle {
22 instructions_tx: mpsc::Sender<super::logical_meter_actor::Instruction>,
23 graph: ComponentGraph<ElectricalComponent, ElectricalComponentConnection>,
24}
25
26impl LogicalMeterHandle {
27 pub async fn try_new(
29 client: MicrogridClientHandle,
30 config: LogicalMeterConfig,
31 ) -> Result<Self, Error> {
32 let (sender, receiver) = mpsc::channel(8);
33 let graph = ComponentGraph::try_new(
34 client.list_electrical_components(vec![], vec![]).await?,
35 client
36 .list_electrical_component_connections(vec![], vec![])
37 .await?,
38 frequenz_microgrid_component_graph::ComponentGraphConfig {
39 allow_component_validation_failures: true,
40 allow_unconnected_components: true,
41 allow_unspecified_inverters: false,
42 disable_fallback_components: false,
43 },
44 )
45 .map_err(|e| {
46 Error::component_graph_error(format!("Unable to create a component graph: {e}"))
47 })?;
48
49 let logical_meter = LogicalMeterActor::try_new(receiver, client, config)?;
50
51 tokio::task::spawn(async move {
52 logical_meter.run().await;
53 });
54
55 Ok(Self {
56 instructions_tx: sender,
57 graph,
58 })
59 }
60
61 pub fn grid<M: super::metric::Metric>(
64 &mut self,
65 metric: M,
66 ) -> Result<Formula<M::QuantityType>, Error> {
67 Ok(Formula::Subscriber(Box::new(M::FormulaType::grid(
68 &self.graph,
69 metric,
70 self.instructions_tx.clone(),
71 )?)))
72 }
73
74 pub fn battery<M: super::metric::Metric>(
79 &mut self,
80 component_ids: Option<BTreeSet<u64>>,
81 metric: M,
82 ) -> Result<Formula<M::QuantityType>, Error> {
83 Ok(Formula::Subscriber(Box::new(M::FormulaType::battery(
84 &self.graph,
85 metric,
86 self.instructions_tx.clone(),
87 component_ids,
88 )?)))
89 }
90
91 pub fn chp<M: super::metric::Metric>(
96 &mut self,
97 component_ids: Option<BTreeSet<u64>>,
98 metric: M,
99 ) -> Result<Formula<M::QuantityType>, Error> {
100 Ok(Formula::Subscriber(Box::new(M::FormulaType::chp(
101 &self.graph,
102 metric,
103 self.instructions_tx.clone(),
104 component_ids,
105 )?)))
106 }
107
108 pub fn pv<M: super::metric::Metric>(
113 &mut self,
114 component_ids: Option<BTreeSet<u64>>,
115 metric: M,
116 ) -> Result<Formula<M::QuantityType>, Error> {
117 Ok(Formula::Subscriber(Box::new(M::FormulaType::pv(
118 &self.graph,
119 metric,
120 self.instructions_tx.clone(),
121 component_ids,
122 )?)))
123 }
124
125 pub fn ev_charger<M: super::metric::Metric>(
131 &mut self,
132 component_ids: Option<BTreeSet<u64>>,
133 metric: M,
134 ) -> Result<Formula<M::QuantityType>, Error> {
135 Ok(Formula::Subscriber(Box::new(M::FormulaType::ev_charger(
136 &self.graph,
137 metric,
138 self.instructions_tx.clone(),
139 component_ids,
140 )?)))
141 }
142
143 pub fn consumer<M: super::metric::Metric>(
146 &mut self,
147 metric: M,
148 ) -> Result<Formula<M::QuantityType>, Error> {
149 Ok(Formula::Subscriber(Box::new(M::FormulaType::consumer(
150 &self.graph,
151 metric,
152 self.instructions_tx.clone(),
153 )?)))
154 }
155
156 pub fn producer<M: super::metric::Metric>(
159 &mut self,
160 metric: M,
161 ) -> Result<Formula<M::QuantityType>, Error> {
162 Ok(Formula::Subscriber(Box::new(M::FormulaType::producer(
163 &self.graph,
164 metric,
165 self.instructions_tx.clone(),
166 )?)))
167 }
168
169 pub fn component<M: super::metric::Metric>(
172 &mut self,
173 component_id: u64,
174 metric: M,
175 ) -> Result<Formula<M::QuantityType>, Error> {
176 Ok(Formula::Subscriber(Box::new(M::FormulaType::component(
177 &self.graph,
178 metric,
179 self.instructions_tx.clone(),
180 component_id,
181 )?)))
182 }
183}
184
185#[cfg(test)]
186mod tests {
187 use chrono::TimeDelta;
188 use tokio_stream::{StreamExt, wrappers::BroadcastStream};
189
190 use crate::{
191 LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, Sample,
192 client::test_utils::{
193 MockComponent,
194 MockMicrogridApiClient, },
196 logical_meter::formula::Formula,
197 quantity::Quantity,
198 };
199
200 async fn new_logical_meter_handle() -> LogicalMeterHandle {
201 let api_client = MockMicrogridApiClient::new(
202 MockComponent::grid(1).with_children(vec![
204 MockComponent::meter(2)
206 .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
207 .with_current(vec![1.0, 1.5, 2.0, 2.5, 2.0, 1.5])
208 .with_children(vec![
209 MockComponent::meter(3)
211 .with_reactive_power(vec![-2.0, -5.0, -4.0, 1.0, 3.0, 4.0])
212 .with_children(vec![
213 MockComponent::pv_inverter(4),
215 ]),
216 MockComponent::meter(5).with_children(vec![
218 MockComponent::battery_inverter(6)
220 .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
221 .with_children(vec![
222 MockComponent::battery(7),
224 ]),
225 MockComponent::battery_inverter(8)
227 .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
228 .with_children(vec![
229 MockComponent::battery(9),
231 ]),
232 ]),
233 MockComponent::meter(10)
235 .with_current(vec![14.5, 15.0, 16.0, 15.5, 14.0, 13.5]),
236 MockComponent::meter(11).with_children(vec![
238 MockComponent::chp(12),
240 ]),
241 MockComponent::meter(13).with_children(vec![
243 MockComponent::ev_charger(14),
245 MockComponent::ev_charger(15),
246 ]),
247 ]),
248 ]),
249 );
250
251 LogicalMeterHandle::try_new(
252 MicrogridClientHandle::new_from_client(api_client),
253 LogicalMeterConfig {
254 resampling_interval: TimeDelta::try_seconds(1).unwrap(),
255 },
256 )
257 .await
258 .unwrap()
259 }
260
261 #[tokio::test]
262 async fn test_formula_display() {
263 let mut lm = new_logical_meter_handle().await;
264
265 let formula = lm.grid(crate::metric::AcPowerActive).unwrap();
266 assert_eq!(formula.to_string(), "METRIC_AC_POWER_ACTIVE::(#2)");
267
268 let formula = lm.battery(None, crate::metric::AcPowerReactive).unwrap();
269 assert_eq!(
270 formula.to_string(),
271 "METRIC_AC_POWER_REACTIVE::(COALESCE(#8 + #6, #5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)))"
272 );
273
274 let formula = lm
275 .battery(Some([9].into()), crate::metric::AcPowerActive)
276 .unwrap();
277 assert_eq!(
278 formula.to_string(),
279 "METRIC_AC_POWER_ACTIVE::(COALESCE(#8, 0.0))"
280 );
281
282 let formula = lm
283 .battery(Some([7].into()), crate::metric::AcVoltage)
284 .unwrap();
285 assert_eq!(formula.to_string(), "METRIC_AC_VOLTAGE::(COALESCE(#5, #6))");
286
287 let formula = lm.battery(None, crate::metric::AcFrequency).unwrap();
288 assert_eq!(
289 formula.to_string(),
290 "METRIC_AC_FREQUENCY::(COALESCE(#5, #6, #8))"
291 );
292
293 let formula = lm.pv(None, crate::metric::AcPowerReactive).unwrap();
294 assert_eq!(
295 formula.to_string(),
296 "METRIC_AC_POWER_REACTIVE::(COALESCE(#4, #3, 0.0))"
297 );
298
299 let formula = lm.chp(None, crate::metric::AcPowerActive).unwrap();
300 assert_eq!(
301 formula.to_string(),
302 "METRIC_AC_POWER_ACTIVE::(COALESCE(#12, #11, 0.0))"
303 );
304
305 let formula = lm.ev_charger(None, crate::metric::AcCurrent).unwrap();
306 assert_eq!(
307 formula.to_string(),
308 "METRIC_AC_CURRENT::(COALESCE(#15 + #14, #13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)))"
309 );
310
311 let formula = lm.consumer(crate::metric::AcCurrent).unwrap();
312 assert_eq!(
313 formula.to_string(),
314 concat!(
315 "METRIC_AC_CURRENT::(MAX(",
316 "#2 - COALESCE(#3, #4, 0.0) - COALESCE(#5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)) ",
317 "- #10 - COALESCE(#11, #12, 0.0)",
318 " - COALESCE(#13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)),",
319 " 0.0)",
320 " + COALESCE(MAX(#3 - #4, 0.0), 0.0) + COALESCE(MAX(#5 - #6 - #8, 0.0), 0.0)",
321 " + MAX(#10, 0.0) + COALESCE(MAX(#11 - #12, 0.0), 0.0)",
322 " + COALESCE(MAX(#13 - #14 - #15, 0.0), 0.0)",
323 ")"
324 )
325 );
326
327 let formula = lm.producer(crate::metric::AcPowerActive).unwrap();
328 assert_eq!(
329 formula.to_string(),
330 concat!(
331 "METRIC_AC_POWER_ACTIVE::(",
332 "MIN(COALESCE(#4, #3, 0.0), 0.0)",
333 " + MIN(COALESCE(#12, #11, 0.0), 0.0)",
334 ")"
335 )
336 );
337
338 let formula = lm.component(10, crate::metric::AcCurrent).unwrap();
339 assert_eq!(formula.to_string(), "METRIC_AC_CURRENT::(#10)");
340 }
341
342 #[tokio::test(start_paused = true)]
343 async fn test_grid_power_formula() {
344 let formula = new_logical_meter_handle()
345 .await
346 .grid(crate::metric::AcPowerActive)
347 .unwrap();
348
349 let samples = fetch_samples(formula, 10).await;
350
351 check_samples(
352 samples,
353 |q| q.as_watts(),
354 vec![
355 Some(5.8),
356 Some(6.0),
357 Some(6.0),
358 Some(7.0),
359 Some(5.8),
360 Some(6.0),
361 Some(6.0),
362 Some(7.0),
363 Some(5.8),
364 Some(6.0),
365 ],
366 )
367 }
368
369 #[tokio::test(start_paused = true)]
370 async fn test_pv_reactive_power_formula() {
371 let formula = new_logical_meter_handle()
372 .await
373 .pv(None, crate::metric::AcPowerReactive)
374 .unwrap();
375
376 let samples = fetch_samples(formula, 10).await;
377
378 check_samples(
379 samples,
380 |q| q.as_volt_amperes_reactive(),
381 vec![
382 Some(-1.4),
383 Some(-0.5),
384 Some(-0.5),
385 Some(4.0),
386 Some(-1.4),
387 Some(-0.5),
388 Some(-0.5),
389 Some(4.0),
390 Some(-1.4),
391 Some(-0.5),
392 ],
393 )
394 }
395
396 #[tokio::test(start_paused = true)]
397 async fn test_battery_voltage_formula() {
398 let formula = new_logical_meter_handle()
399 .await
400 .battery(None, crate::metric::AcVoltage)
401 .unwrap();
402
403 let samples = fetch_samples(formula, 10).await;
404 check_samples(
405 samples,
406 |q| q.as_volts(),
407 vec![
408 Some(398.0),
409 Some(397.67),
410 Some(397.67),
411 Some(396.0),
412 Some(398.0),
413 Some(397.67),
414 Some(397.67),
415 Some(396.0),
416 Some(398.0),
417 Some(397.67),
418 ],
419 )
420 }
421
422 #[tokio::test(start_paused = true)]
423 async fn test_consumer_current_formula() {
424 let formula = new_logical_meter_handle()
425 .await
426 .consumer(crate::metric::AcCurrent)
427 .unwrap();
428
429 let samples = fetch_samples(formula, 10).await;
430 check_samples(
431 samples,
432 |q| q.as_amperes(),
433 vec![
434 Some(15.0),
435 Some(14.75),
436 Some(14.75),
437 Some(13.5),
438 Some(15.0),
439 Some(14.75),
440 Some(14.75),
441 Some(13.5),
442 Some(15.0),
443 Some(14.75),
444 ],
445 )
446 }
447
448 async fn fetch_samples<Q: Quantity>(formula: Formula<Q>, num_values: usize) -> Vec<Sample<Q>> {
449 let rx = formula.subscribe().await.unwrap();
450
451 BroadcastStream::new(rx)
452 .take(num_values)
453 .map(|x| x.unwrap())
454 .collect()
455 .await
456 }
457
458 #[track_caller]
459 fn check_samples<Q: Quantity>(
460 samples: Vec<Sample<Q>>,
461 extractor: impl Fn(Q) -> f32,
462 expected_values: Vec<Option<f32>>,
463 ) {
464 let values = samples
465 .iter()
466 .map(|res| res.value().map(|v| extractor(v)))
467 .collect::<Vec<_>>();
468
469 let one_second = TimeDelta::try_seconds(1).unwrap();
470
471 samples.as_slice().windows(2).for_each(|w| {
472 assert_eq!(w[1].timestamp() - w[0].timestamp(), one_second);
473 });
474
475 for (v, ev) in values.iter().zip(expected_values.iter()) {
476 match (v, ev) {
477 (Some(v), Some(ev)) => assert!(
478 (v - ev).abs() < 0.01,
479 "expected value {ev:?}, got value {v:?}"
480 ),
481 (None, None) => {}
482 _ => panic!("expected value {ev:?}, got value {v:?}"),
483 }
484 }
485 }
486}