1use crate::logical_meter::formula::Formula;
5use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider;
6use crate::{
7 client::MicrogridClientHandle,
8 client::proto::common::microgrid::electrical_components::{
9 ElectricalComponent, ElectricalComponentConnection,
10 },
11 error::Error,
12 metric,
13};
14use frequenz_microgrid_component_graph::{self, ComponentGraph};
15use std::collections::BTreeSet;
16use tokio::sync::mpsc;
17
18use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor};
19
20#[derive(Clone)]
22pub struct LogicalMeterHandle {
23 instructions_tx: mpsc::Sender<super::logical_meter_actor::Instruction>,
24 graph: ComponentGraph<ElectricalComponent, ElectricalComponentConnection>,
25}
26
27impl LogicalMeterHandle {
28 pub async fn try_new(
30 client: MicrogridClientHandle,
31 config: LogicalMeterConfig,
32 ) -> Result<Self, Error> {
33 Self::try_new_with_clock(client, config, crate::wall_clock_timer::SystemClock).await
34 }
35
36 pub(crate) async fn try_new_with_clock<C: crate::wall_clock_timer::Clock + 'static>(
37 client: MicrogridClientHandle,
38 config: LogicalMeterConfig,
39 clock: C,
40 ) -> Result<Self, Error> {
41 let (sender, receiver) = mpsc::channel(8);
42 let graph = ComponentGraph::try_new(
43 client.list_electrical_components(vec![], vec![]).await?,
44 client
45 .list_electrical_component_connections(vec![], vec![])
46 .await?,
47 frequenz_microgrid_component_graph::ComponentGraphConfig {
48 allow_component_validation_failures: true,
49 allow_unconnected_components: true,
50 allow_unspecified_inverters: false,
51 disable_fallback_components: false,
52 },
53 )
54 .map_err(|e| {
55 Error::component_graph_error(format!("Unable to create a component graph: {e}"))
56 })?;
57
58 let logical_meter = LogicalMeterActor::try_new(receiver, client, config, clock)?;
59
60 tokio::task::spawn(async move {
61 logical_meter.run().await;
62 });
63
64 Ok(Self {
65 instructions_tx: sender,
66 graph,
67 })
68 }
69
70 pub fn grid<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
73 Ok(Formula::Subscriber(Box::new(M::FormulaType::grid(
74 &self.graph,
75 self.instructions_tx.clone(),
76 )?)))
77 }
78
79 pub fn battery<M: metric::Metric>(
84 &self,
85 component_ids: Option<BTreeSet<u64>>,
86 ) -> Result<Formula<M::QuantityType>, Error> {
87 Ok(Formula::Subscriber(Box::new(M::FormulaType::battery(
88 &self.graph,
89 self.instructions_tx.clone(),
90 component_ids,
91 )?)))
92 }
93
94 pub fn chp<M: metric::Metric>(
99 &self,
100 component_ids: Option<BTreeSet<u64>>,
101 ) -> Result<Formula<M::QuantityType>, Error> {
102 Ok(Formula::Subscriber(Box::new(M::FormulaType::chp(
103 &self.graph,
104 self.instructions_tx.clone(),
105 component_ids,
106 )?)))
107 }
108
109 pub fn pv<M: metric::Metric>(
114 &self,
115 component_ids: Option<BTreeSet<u64>>,
116 ) -> Result<Formula<M::QuantityType>, Error> {
117 Ok(Formula::Subscriber(Box::new(M::FormulaType::pv(
118 &self.graph,
119 self.instructions_tx.clone(),
120 component_ids,
121 )?)))
122 }
123
124 pub fn ev_charger<M: metric::Metric>(
130 &self,
131 component_ids: Option<BTreeSet<u64>>,
132 ) -> Result<Formula<M::QuantityType>, Error> {
133 Ok(Formula::Subscriber(Box::new(M::FormulaType::ev_charger(
134 &self.graph,
135 self.instructions_tx.clone(),
136 component_ids,
137 )?)))
138 }
139
140 pub fn consumer<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
143 Ok(Formula::Subscriber(Box::new(M::FormulaType::consumer(
144 &self.graph,
145 self.instructions_tx.clone(),
146 )?)))
147 }
148
149 pub fn producer<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
152 Ok(Formula::Subscriber(Box::new(M::FormulaType::producer(
153 &self.graph,
154 self.instructions_tx.clone(),
155 )?)))
156 }
157
158 pub fn component<M: metric::Metric>(
161 &self,
162 component_id: u64,
163 ) -> Result<Formula<M::QuantityType>, Error> {
164 Ok(Formula::Subscriber(Box::new(M::FormulaType::component(
165 &self.graph,
166 self.instructions_tx.clone(),
167 component_id,
168 )?)))
169 }
170
171 pub fn graph(&self) -> &ComponentGraph<ElectricalComponent, ElectricalComponentConnection> {
173 &self.graph
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use chrono::TimeDelta;
180 use frequenz_resampling::ResamplingFunction;
181 use tokio_stream::{StreamExt, wrappers::BroadcastStream};
182
183 use crate::{
184 LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, Sample,
185 client::test_utils::{
186 MockComponent,
187 MockMicrogridApiClient, },
189 logical_meter::formula::Formula,
190 quantity::Quantity,
191 };
192
193 async fn new_logical_meter_handle(config: Option<LogicalMeterConfig>) -> LogicalMeterHandle {
194 let api_client = MockMicrogridApiClient::new(
195 MockComponent::grid(1).with_children(vec![
197 MockComponent::meter(2)
199 .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
200 .with_current(vec![1.0, 1.5, 2.0, 2.5, 2.0, 1.5])
201 .with_children(vec![
202 MockComponent::meter(3)
204 .with_reactive_power(vec![-2.0, -5.0, -4.0, 1.0, 3.0, 4.0])
205 .with_children(vec![
206 MockComponent::pv_inverter(4),
208 ]),
209 MockComponent::meter(5).with_children(vec![
211 MockComponent::battery_inverter(6)
213 .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
214 .with_children(vec![
215 MockComponent::battery(7),
217 ]),
218 MockComponent::battery_inverter(8)
220 .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
221 .with_children(vec![
222 MockComponent::battery(9),
224 ]),
225 ]),
226 MockComponent::meter(10)
228 .with_current(vec![14.5, 15.0, 16.0, 15.5, 14.0, 13.5]),
229 MockComponent::meter(11).with_children(vec![
231 MockComponent::chp(12),
233 ]),
234 MockComponent::meter(13).with_children(vec![
236 MockComponent::ev_charger(14),
238 MockComponent::ev_charger(15),
239 ]),
240 ]),
241 ]),
242 );
243
244 let clock = api_client.clock();
245 LogicalMeterHandle::try_new_with_clock(
246 MicrogridClientHandle::new_from_client(api_client),
247 config.unwrap_or_else(|| LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap())),
248 clock,
249 )
250 .await
251 .unwrap()
252 }
253
254 #[tokio::test]
255 async fn test_formula_display() {
256 let lm = new_logical_meter_handle(None).await;
257
258 let formula = lm.grid::<crate::metric::AcPowerActive>().unwrap();
259 assert_eq!(formula.to_string(), "METRIC_AC_POWER_ACTIVE::(#2)");
260
261 let formula = lm.battery::<crate::metric::AcPowerReactive>(None).unwrap();
262 assert_eq!(
263 formula.to_string(),
264 "METRIC_AC_POWER_REACTIVE::(COALESCE(#8 + #6, #5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)))"
265 );
266
267 let formula = lm
268 .battery::<crate::metric::AcPowerActive>(Some([9].into()))
269 .unwrap();
270 assert_eq!(
271 formula.to_string(),
272 "METRIC_AC_POWER_ACTIVE::(COALESCE(#8, 0.0))"
273 );
274
275 let formula = lm
276 .battery::<crate::metric::AcVoltage>(Some([7].into()))
277 .unwrap();
278 assert_eq!(formula.to_string(), "METRIC_AC_VOLTAGE::(COALESCE(#5, #6))");
279
280 let formula = lm.battery::<crate::metric::AcFrequency>(None).unwrap();
281 assert_eq!(
282 formula.to_string(),
283 "METRIC_AC_FREQUENCY::(COALESCE(#5, #6, #8))"
284 );
285
286 let formula = lm.pv::<crate::metric::AcPowerReactive>(None).unwrap();
287 assert_eq!(
288 formula.to_string(),
289 "METRIC_AC_POWER_REACTIVE::(COALESCE(#4, #3, 0.0))"
290 );
291
292 let formula = lm.chp::<crate::metric::AcPowerActive>(None).unwrap();
293 assert_eq!(
294 formula.to_string(),
295 "METRIC_AC_POWER_ACTIVE::(COALESCE(#12, #11, 0.0))"
296 );
297
298 let formula = lm.ev_charger::<crate::metric::AcCurrent>(None).unwrap();
299 assert_eq!(
300 formula.to_string(),
301 "METRIC_AC_CURRENT::(COALESCE(#15 + #14, #13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)))"
302 );
303
304 let formula = lm.consumer::<crate::metric::AcCurrent>().unwrap();
305 assert_eq!(
306 formula.to_string(),
307 concat!(
308 "METRIC_AC_CURRENT::(MAX(",
309 "#2 - COALESCE(#3, #4, 0.0) - COALESCE(#5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)) ",
310 "- #10 - COALESCE(#11, #12, 0.0)",
311 " - COALESCE(#13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)),",
312 " 0.0)",
313 " + COALESCE(MAX(#3 - #4, 0.0), 0.0) + COALESCE(MAX(#5 - #6 - #8, 0.0), 0.0)",
314 " + MAX(#10, 0.0) + COALESCE(MAX(#11 - #12, 0.0), 0.0)",
315 " + COALESCE(MAX(#13 - #14 - #15, 0.0), 0.0)",
316 ")"
317 )
318 );
319
320 let formula = lm.producer::<crate::metric::AcPowerActive>().unwrap();
321 assert_eq!(
322 formula.to_string(),
323 concat!(
324 "METRIC_AC_POWER_ACTIVE::(",
325 "MIN(COALESCE(#4, #3, 0.0), 0.0)",
326 " + MIN(COALESCE(#12, #11, 0.0), 0.0)",
327 ")"
328 )
329 );
330
331 let formula = lm.component::<crate::metric::AcCurrent>(10).unwrap();
332 assert_eq!(formula.to_string(), "METRIC_AC_CURRENT::(#10)");
333 }
334
335 #[tokio::test(start_paused = true)]
336 async fn test_grid_power_formula() {
337 let formula = new_logical_meter_handle(None)
338 .await
339 .grid::<crate::metric::AcPowerActive>()
340 .unwrap();
341
342 let samples = fetch_samples(formula, 10).await;
343
344 check_samples(
345 samples,
346 |q| q.as_watts(),
347 TimeDelta::try_seconds(1).unwrap(),
348 vec![
349 Some(5.8),
350 Some(6.0),
351 Some(6.0),
352 Some(7.0),
353 Some(5.8),
354 Some(6.0),
355 Some(6.0),
356 Some(7.0),
357 Some(5.8),
358 Some(6.0),
359 ],
360 )
361 }
362
363 #[tokio::test(start_paused = true)]
364 async fn test_pv_reactive_power_formula() {
365 let formula = new_logical_meter_handle(None)
366 .await
367 .pv::<crate::metric::AcPowerReactive>(None)
368 .unwrap();
369
370 let samples = fetch_samples(formula, 10).await;
371
372 check_samples(
373 samples,
374 |q| q.as_volt_amperes_reactive(),
375 TimeDelta::try_seconds(1).unwrap(),
376 vec![
377 Some(-1.4),
378 Some(-0.5),
379 Some(-0.5),
380 Some(4.0),
381 Some(-1.4),
382 Some(-0.5),
383 Some(-0.5),
384 Some(4.0),
385 Some(-1.4),
386 Some(-0.5),
387 ],
388 )
389 }
390
391 #[tokio::test(start_paused = true)]
392 async fn test_battery_voltage_formula() {
393 let formula = new_logical_meter_handle(None)
394 .await
395 .battery::<crate::metric::AcVoltage>(None)
396 .unwrap();
397
398 let samples = fetch_samples(formula, 10).await;
399 check_samples(
400 samples,
401 |q| q.as_volts(),
402 TimeDelta::try_seconds(1).unwrap(),
403 vec![
404 Some(398.0),
405 Some(397.67),
406 Some(397.67),
407 Some(396.0),
408 Some(398.0),
409 Some(397.67),
410 Some(397.67),
411 Some(396.0),
412 Some(398.0),
413 Some(397.67),
414 ],
415 )
416 }
417
418 #[tokio::test(start_paused = true)]
419 async fn test_resampling_functions() {
420 let lm_config = Some(
421 LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
422 .with_default_resampling_function(ResamplingFunction::Count)
423 .override_resampling_function::<crate::metric::AcVoltage>(ResamplingFunction::Last),
424 );
425 let lm = new_logical_meter_handle(lm_config).await;
426 let bat_volt_formula = lm.battery::<crate::metric::AcVoltage>(None).unwrap();
427
428 let samples = fetch_samples(bat_volt_formula, 10).await;
429 check_samples(
430 samples,
431 |q| q.as_volts(),
432 TimeDelta::try_milliseconds(200).unwrap(),
433 vec![
434 Some(400.0),
435 Some(400.0),
436 Some(398.0),
437 Some(396.0),
438 Some(396.0),
439 Some(396.0),
440 Some(396.0),
441 Some(396.0),
442 None,
443 None,
444 ],
445 );
446
447 let cons_pow_formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
448
449 let samples = fetch_samples(cons_pow_formula, 10).await;
450 check_samples(
451 samples,
452 |q| q.as_watts(),
453 TimeDelta::try_milliseconds(200).unwrap(),
454 vec![
455 Some(1.0),
456 Some(2.0),
457 Some(3.0),
458 Some(3.0),
459 Some(3.0),
460 Some(3.0),
461 Some(2.0),
462 Some(1.0),
463 Some(0.0),
464 Some(0.0),
465 ],
466 );
467 }
468
469 #[tokio::test(start_paused = true)]
470 async fn test_max_age_in_intervals() {
471 let lm_config = Some(
472 LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
473 .with_max_age_in_intervals(1)
474 .with_default_resampling_function(ResamplingFunction::Count),
475 );
476 let lm = new_logical_meter_handle(lm_config).await;
477 let formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
478
479 let samples = fetch_samples(formula, 8).await;
480 check_samples(
481 samples,
482 |q| q.as_watts(),
483 TimeDelta::try_milliseconds(200).unwrap(),
484 vec![
485 Some(1.0),
486 Some(1.0),
487 Some(1.0),
488 Some(1.0),
489 Some(1.0),
490 Some(1.0),
491 Some(0.0),
492 Some(0.0),
493 ],
494 );
495
496 let lm_config = Some(
497 LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
498 .with_max_age_in_intervals(3)
499 .with_default_resampling_function(ResamplingFunction::Count),
500 );
501 let lm = new_logical_meter_handle(lm_config).await;
502 let formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
503
504 let samples = fetch_samples(formula, 10).await;
505 check_samples(
506 samples,
507 |q| q.as_watts(),
508 TimeDelta::try_milliseconds(200).unwrap(),
509 vec![
510 Some(1.0),
511 Some(2.0),
512 Some(3.0),
513 Some(3.0),
514 Some(3.0),
515 Some(3.0),
516 Some(2.0),
517 Some(1.0),
518 Some(0.0),
519 Some(0.0),
520 ],
521 )
522 }
523
524 #[tokio::test(start_paused = true)]
525 async fn test_consumer_current_formula() {
526 let formula = new_logical_meter_handle(None)
527 .await
528 .consumer::<crate::metric::AcCurrent>()
529 .unwrap();
530
531 let samples = fetch_samples(formula, 10).await;
532 check_samples(
533 samples,
534 |q| q.as_amperes(),
535 TimeDelta::try_seconds(1).unwrap(),
536 vec![
537 Some(15.0),
538 Some(14.75),
539 Some(14.75),
540 Some(13.5),
541 Some(15.0),
542 Some(14.75),
543 Some(14.75),
544 Some(13.5),
545 Some(15.0),
546 Some(14.75),
547 ],
548 )
549 }
550
551 async fn fetch_samples<Q: Quantity>(formula: Formula<Q>, num_values: usize) -> Vec<Sample<Q>> {
552 let rx = formula.subscribe().await.unwrap();
553
554 BroadcastStream::new(rx)
555 .take(num_values)
556 .map(|x| x.unwrap())
557 .collect()
558 .await
559 }
560
561 #[track_caller]
562 fn check_samples<Q: Quantity>(
563 samples: Vec<Sample<Q>>,
564 extractor: impl Fn(Q) -> f32,
565 expected_interval: TimeDelta,
566 expected_values: Vec<Option<f32>>,
567 ) {
568 let values = samples
569 .iter()
570 .map(|res| res.value().map(&extractor))
571 .collect::<Vec<_>>();
572
573 samples.as_slice().windows(2).for_each(|w| {
574 assert_eq!(
575 w[1].timestamp() - w[0].timestamp(),
576 expected_interval,
577 "Samples are not spaced at the expected interval"
578 );
579 });
580
581 for (id, (v, ev)) in values.iter().zip(expected_values.iter()).enumerate() {
582 match (v, ev) {
583 (Some(v), Some(ev)) => assert!(
584 (v - ev).abs() < 0.01,
585 "Item {id} - expected value {ev:?}, got value {v:?}"
586 ),
587 (None, None) => {}
588 _ => panic!("Item {id} - expected value {ev:?}, got value {v:?}"),
589 }
590 }
591 }
592}