Skip to main content

frequenz_microgrid/logical_meter/
logical_meter_handle.rs

1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4use crate::logical_meter::formula::Formula;
5use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider;
6use crate::{
7    client::MicrogridClientHandle,
8    client::proto::common::microgrid::electrical_components::{
9        ElectricalComponent, ElectricalComponentConnection,
10    },
11    error::Error,
12    metric,
13};
14use frequenz_microgrid_component_graph::{self, ComponentGraph};
15use std::collections::BTreeSet;
16use tokio::sync::mpsc;
17
18use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor};
19
20/// This provides an interface  stream high-level metrics from a microgrid.
21#[derive(Clone)]
22pub struct LogicalMeterHandle {
23    instructions_tx: mpsc::Sender<super::logical_meter_actor::Instruction>,
24    graph: ComponentGraph<ElectricalComponent, ElectricalComponentConnection>,
25}
26
27impl LogicalMeterHandle {
28    /// Creates a new LogicalMeter instance.
29    pub async fn try_new(
30        client: MicrogridClientHandle,
31        config: LogicalMeterConfig,
32    ) -> Result<Self, Error> {
33        Self::try_new_with_clock(client, config, crate::wall_clock_timer::SystemClock).await
34    }
35
36    pub(crate) async fn try_new_with_clock<C: crate::wall_clock_timer::Clock + 'static>(
37        client: MicrogridClientHandle,
38        config: LogicalMeterConfig,
39        clock: C,
40    ) -> Result<Self, Error> {
41        let (sender, receiver) = mpsc::channel(8);
42        let graph = ComponentGraph::try_new(
43            client.list_electrical_components(vec![], vec![]).await?,
44            client
45                .list_electrical_component_connections(vec![], vec![])
46                .await?,
47            frequenz_microgrid_component_graph::ComponentGraphConfig {
48                allow_component_validation_failures: true,
49                allow_unconnected_components: true,
50                allow_unspecified_inverters: false,
51                disable_fallback_components: false,
52            },
53        )
54        .map_err(|e| {
55            Error::component_graph_error(format!("Unable to create a component graph: {e}"))
56        })?;
57
58        let logical_meter = LogicalMeterActor::try_new(receiver, client, config, clock)?;
59
60        tokio::task::spawn(async move {
61            logical_meter.run().await;
62        });
63
64        Ok(Self {
65            instructions_tx: sender,
66            graph,
67        })
68    }
69
70    /// Returns a receiver that streams samples for the given `metric` at the grid
71    /// connection point.
72    pub fn grid<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
73        Ok(Formula::Subscriber(Box::new(M::FormulaType::grid(
74            &self.graph,
75            self.instructions_tx.clone(),
76        )?)))
77    }
78
79    /// Returns a receiver that streams samples for the given `metric` for the
80    /// given battery IDs.
81    ///
82    /// When `component_ids` is `None`, all batteries in the microgrid are used.
83    pub fn battery<M: metric::Metric>(
84        &self,
85        component_ids: Option<BTreeSet<u64>>,
86    ) -> Result<Formula<M::QuantityType>, Error> {
87        Ok(Formula::Subscriber(Box::new(M::FormulaType::battery(
88            &self.graph,
89            self.instructions_tx.clone(),
90            component_ids,
91        )?)))
92    }
93
94    /// Returns a receiver that streams samples for the given `metric` for the
95    /// given CHP IDs.
96    ///
97    /// When `component_ids` is `None`, all CHPs in the microgrid are used.
98    pub fn chp<M: metric::Metric>(
99        &self,
100        component_ids: Option<BTreeSet<u64>>,
101    ) -> Result<Formula<M::QuantityType>, Error> {
102        Ok(Formula::Subscriber(Box::new(M::FormulaType::chp(
103            &self.graph,
104            self.instructions_tx.clone(),
105            component_ids,
106        )?)))
107    }
108
109    /// Returns a receiver that streams samples for the given `metric` for the
110    /// given PV IDs.
111    ///
112    /// When `component_ids` is `None`, all PVs in the microgrid are used.
113    pub fn pv<M: metric::Metric>(
114        &self,
115        component_ids: Option<BTreeSet<u64>>,
116    ) -> Result<Formula<M::QuantityType>, Error> {
117        Ok(Formula::Subscriber(Box::new(M::FormulaType::pv(
118            &self.graph,
119            self.instructions_tx.clone(),
120            component_ids,
121        )?)))
122    }
123
124    /// Returns a receiver that streams samples for the given `metric` for the
125    /// given EV charger IDs.
126    ///
127    /// When `component_ids` is `None`, all EV chargers in the microgrid are
128    /// used.
129    pub fn ev_charger<M: metric::Metric>(
130        &self,
131        component_ids: Option<BTreeSet<u64>>,
132    ) -> Result<Formula<M::QuantityType>, Error> {
133        Ok(Formula::Subscriber(Box::new(M::FormulaType::ev_charger(
134            &self.graph,
135            self.instructions_tx.clone(),
136            component_ids,
137        )?)))
138    }
139
140    /// Returns a receiver that streams samples for the given `metric` for the
141    /// logical `consumer` in the microgrid.
142    pub fn consumer<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
143        Ok(Formula::Subscriber(Box::new(M::FormulaType::consumer(
144            &self.graph,
145            self.instructions_tx.clone(),
146        )?)))
147    }
148
149    /// Returns a receiver that streams samples for the given `metric` for the
150    /// logical `producer` in the microgrid.
151    pub fn producer<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
152        Ok(Formula::Subscriber(Box::new(M::FormulaType::producer(
153            &self.graph,
154            self.instructions_tx.clone(),
155        )?)))
156    }
157
158    /// Returns a receiver that streams samples for the given `metric` for the
159    /// given component ID.
160    pub fn component<M: metric::Metric>(
161        &self,
162        component_id: u64,
163    ) -> Result<Formula<M::QuantityType>, Error> {
164        Ok(Formula::Subscriber(Box::new(M::FormulaType::component(
165            &self.graph,
166            self.instructions_tx.clone(),
167            component_id,
168        )?)))
169    }
170
171    /// Returns a reference to the component graph.
172    pub fn graph(&self) -> &ComponentGraph<ElectricalComponent, ElectricalComponentConnection> {
173        &self.graph
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use chrono::TimeDelta;
180    use frequenz_resampling::ResamplingFunction;
181    use tokio_stream::{StreamExt, wrappers::BroadcastStream};
182
183    use crate::{
184        LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, Sample,
185        client::test_utils::{
186            MockComponent,
187            MockMicrogridApiClient, //
188        },
189        logical_meter::formula::Formula,
190        quantity::Quantity,
191    };
192
193    async fn new_logical_meter_handle(config: Option<LogicalMeterConfig>) -> LogicalMeterHandle {
194        let api_client = MockMicrogridApiClient::new(
195            // Grid connection point
196            MockComponent::grid(1).with_children(vec![
197                // Main meter
198                MockComponent::meter(2)
199                    .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
200                    .with_current(vec![1.0, 1.5, 2.0, 2.5, 2.0, 1.5])
201                    .with_children(vec![
202                        // PV meter
203                        MockComponent::meter(3)
204                            .with_reactive_power(vec![-2.0, -5.0, -4.0, 1.0, 3.0, 4.0])
205                            .with_children(vec![
206                                // PV inverter
207                                MockComponent::pv_inverter(4),
208                            ]),
209                        // Battery meter
210                        MockComponent::meter(5).with_children(vec![
211                            // Battery inverter
212                            MockComponent::battery_inverter(6)
213                                .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
214                                .with_children(vec![
215                                    // Battery
216                                    MockComponent::battery(7),
217                                ]),
218                            // Battery inverter
219                            MockComponent::battery_inverter(8)
220                                .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
221                                .with_children(vec![
222                                    // Battery
223                                    MockComponent::battery(9),
224                                ]),
225                        ]),
226                        // Consumer meter
227                        MockComponent::meter(10)
228                            .with_current(vec![14.5, 15.0, 16.0, 15.5, 14.0, 13.5]),
229                        // Chp meter
230                        MockComponent::meter(11).with_children(vec![
231                            // Chp
232                            MockComponent::chp(12),
233                        ]),
234                        // Ev charger meter
235                        MockComponent::meter(13).with_children(vec![
236                            // Ev chargers
237                            MockComponent::ev_charger(14),
238                            MockComponent::ev_charger(15),
239                        ]),
240                    ]),
241            ]),
242        );
243
244        let clock = api_client.clock();
245        LogicalMeterHandle::try_new_with_clock(
246            MicrogridClientHandle::new_from_client(api_client),
247            config.unwrap_or_else(|| LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap())),
248            clock,
249        )
250        .await
251        .unwrap()
252    }
253
254    #[tokio::test]
255    async fn test_formula_display() {
256        let lm = new_logical_meter_handle(None).await;
257
258        let formula = lm.grid::<crate::metric::AcPowerActive>().unwrap();
259        assert_eq!(formula.to_string(), "METRIC_AC_POWER_ACTIVE::(#2)");
260
261        let formula = lm.battery::<crate::metric::AcPowerReactive>(None).unwrap();
262        assert_eq!(
263            formula.to_string(),
264            "METRIC_AC_POWER_REACTIVE::(COALESCE(#8 + #6, #5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)))"
265        );
266
267        let formula = lm
268            .battery::<crate::metric::AcPowerActive>(Some([9].into()))
269            .unwrap();
270        assert_eq!(
271            formula.to_string(),
272            "METRIC_AC_POWER_ACTIVE::(COALESCE(#8, 0.0))"
273        );
274
275        let formula = lm
276            .battery::<crate::metric::AcVoltage>(Some([7].into()))
277            .unwrap();
278        assert_eq!(formula.to_string(), "METRIC_AC_VOLTAGE::(COALESCE(#5, #6))");
279
280        let formula = lm.battery::<crate::metric::AcFrequency>(None).unwrap();
281        assert_eq!(
282            formula.to_string(),
283            "METRIC_AC_FREQUENCY::(COALESCE(#5, #6, #8))"
284        );
285
286        let formula = lm.pv::<crate::metric::AcPowerReactive>(None).unwrap();
287        assert_eq!(
288            formula.to_string(),
289            "METRIC_AC_POWER_REACTIVE::(COALESCE(#4, #3, 0.0))"
290        );
291
292        let formula = lm.chp::<crate::metric::AcPowerActive>(None).unwrap();
293        assert_eq!(
294            formula.to_string(),
295            "METRIC_AC_POWER_ACTIVE::(COALESCE(#12, #11, 0.0))"
296        );
297
298        let formula = lm.ev_charger::<crate::metric::AcCurrent>(None).unwrap();
299        assert_eq!(
300            formula.to_string(),
301            "METRIC_AC_CURRENT::(COALESCE(#15 + #14, #13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)))"
302        );
303
304        let formula = lm.consumer::<crate::metric::AcCurrent>().unwrap();
305        assert_eq!(
306            formula.to_string(),
307            concat!(
308                "METRIC_AC_CURRENT::(MAX(",
309                "#2 - COALESCE(#3, #4, 0.0) - COALESCE(#5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)) ",
310                "- #10 - COALESCE(#11, #12, 0.0)",
311                " - COALESCE(#13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)),",
312                " 0.0)",
313                " + COALESCE(MAX(#3 - #4, 0.0), 0.0) + COALESCE(MAX(#5 - #6 - #8, 0.0), 0.0)",
314                " + MAX(#10, 0.0) + COALESCE(MAX(#11 - #12, 0.0), 0.0)",
315                " + COALESCE(MAX(#13 - #14 - #15, 0.0), 0.0)",
316                ")"
317            )
318        );
319
320        let formula = lm.producer::<crate::metric::AcPowerActive>().unwrap();
321        assert_eq!(
322            formula.to_string(),
323            concat!(
324                "METRIC_AC_POWER_ACTIVE::(",
325                "MIN(COALESCE(#4, #3, 0.0), 0.0)",
326                " + MIN(COALESCE(#12, #11, 0.0), 0.0)",
327                ")"
328            )
329        );
330
331        let formula = lm.component::<crate::metric::AcCurrent>(10).unwrap();
332        assert_eq!(formula.to_string(), "METRIC_AC_CURRENT::(#10)");
333    }
334
335    #[tokio::test(start_paused = true)]
336    async fn test_grid_power_formula() {
337        let formula = new_logical_meter_handle(None)
338            .await
339            .grid::<crate::metric::AcPowerActive>()
340            .unwrap();
341
342        let samples = fetch_samples(formula, 10).await;
343
344        check_samples(
345            samples,
346            |q| q.as_watts(),
347            TimeDelta::try_seconds(1).unwrap(),
348            vec![
349                Some(5.8),
350                Some(6.0),
351                Some(6.0),
352                Some(7.0),
353                Some(5.8),
354                Some(6.0),
355                Some(6.0),
356                Some(7.0),
357                Some(5.8),
358                Some(6.0),
359            ],
360        )
361    }
362
363    #[tokio::test(start_paused = true)]
364    async fn test_pv_reactive_power_formula() {
365        let formula = new_logical_meter_handle(None)
366            .await
367            .pv::<crate::metric::AcPowerReactive>(None)
368            .unwrap();
369
370        let samples = fetch_samples(formula, 10).await;
371
372        check_samples(
373            samples,
374            |q| q.as_volt_amperes_reactive(),
375            TimeDelta::try_seconds(1).unwrap(),
376            vec![
377                Some(-1.4),
378                Some(-0.5),
379                Some(-0.5),
380                Some(4.0),
381                Some(-1.4),
382                Some(-0.5),
383                Some(-0.5),
384                Some(4.0),
385                Some(-1.4),
386                Some(-0.5),
387            ],
388        )
389    }
390
391    #[tokio::test(start_paused = true)]
392    async fn test_battery_voltage_formula() {
393        let formula = new_logical_meter_handle(None)
394            .await
395            .battery::<crate::metric::AcVoltage>(None)
396            .unwrap();
397
398        let samples = fetch_samples(formula, 10).await;
399        check_samples(
400            samples,
401            |q| q.as_volts(),
402            TimeDelta::try_seconds(1).unwrap(),
403            vec![
404                Some(398.0),
405                Some(397.67),
406                Some(397.67),
407                Some(396.0),
408                Some(398.0),
409                Some(397.67),
410                Some(397.67),
411                Some(396.0),
412                Some(398.0),
413                Some(397.67),
414            ],
415        )
416    }
417
418    #[tokio::test(start_paused = true)]
419    async fn test_resampling_functions() {
420        let lm_config = Some(
421            LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
422                .with_default_resampling_function(ResamplingFunction::Count)
423                .override_resampling_function::<crate::metric::AcVoltage>(ResamplingFunction::Last),
424        );
425        let lm = new_logical_meter_handle(lm_config).await;
426        let bat_volt_formula = lm.battery::<crate::metric::AcVoltage>(None).unwrap();
427
428        let samples = fetch_samples(bat_volt_formula, 10).await;
429        check_samples(
430            samples,
431            |q| q.as_volts(),
432            TimeDelta::try_milliseconds(200).unwrap(),
433            vec![
434                Some(400.0),
435                Some(400.0),
436                Some(398.0),
437                Some(396.0),
438                Some(396.0),
439                Some(396.0),
440                Some(396.0),
441                Some(396.0),
442                None,
443                None,
444            ],
445        );
446
447        let cons_pow_formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
448
449        let samples = fetch_samples(cons_pow_formula, 10).await;
450        check_samples(
451            samples,
452            |q| q.as_watts(),
453            TimeDelta::try_milliseconds(200).unwrap(),
454            vec![
455                Some(1.0),
456                Some(2.0),
457                Some(3.0),
458                Some(3.0),
459                Some(3.0),
460                Some(3.0),
461                Some(2.0),
462                Some(1.0),
463                Some(0.0),
464                Some(0.0),
465            ],
466        );
467    }
468
469    #[tokio::test(start_paused = true)]
470    async fn test_max_age_in_intervals() {
471        let lm_config = Some(
472            LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
473                .with_max_age_in_intervals(1)
474                .with_default_resampling_function(ResamplingFunction::Count),
475        );
476        let lm = new_logical_meter_handle(lm_config).await;
477        let formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
478
479        let samples = fetch_samples(formula, 8).await;
480        check_samples(
481            samples,
482            |q| q.as_watts(),
483            TimeDelta::try_milliseconds(200).unwrap(),
484            vec![
485                Some(1.0),
486                Some(1.0),
487                Some(1.0),
488                Some(1.0),
489                Some(1.0),
490                Some(1.0),
491                Some(0.0),
492                Some(0.0),
493            ],
494        );
495
496        let lm_config = Some(
497            LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
498                .with_max_age_in_intervals(3)
499                .with_default_resampling_function(ResamplingFunction::Count),
500        );
501        let lm = new_logical_meter_handle(lm_config).await;
502        let formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
503
504        let samples = fetch_samples(formula, 10).await;
505        check_samples(
506            samples,
507            |q| q.as_watts(),
508            TimeDelta::try_milliseconds(200).unwrap(),
509            vec![
510                Some(1.0),
511                Some(2.0),
512                Some(3.0),
513                Some(3.0),
514                Some(3.0),
515                Some(3.0),
516                Some(2.0),
517                Some(1.0),
518                Some(0.0),
519                Some(0.0),
520            ],
521        )
522    }
523
524    #[tokio::test(start_paused = true)]
525    async fn test_consumer_current_formula() {
526        let formula = new_logical_meter_handle(None)
527            .await
528            .consumer::<crate::metric::AcCurrent>()
529            .unwrap();
530
531        let samples = fetch_samples(formula, 10).await;
532        check_samples(
533            samples,
534            |q| q.as_amperes(),
535            TimeDelta::try_seconds(1).unwrap(),
536            vec![
537                Some(15.0),
538                Some(14.75),
539                Some(14.75),
540                Some(13.5),
541                Some(15.0),
542                Some(14.75),
543                Some(14.75),
544                Some(13.5),
545                Some(15.0),
546                Some(14.75),
547            ],
548        )
549    }
550
551    async fn fetch_samples<Q: Quantity>(formula: Formula<Q>, num_values: usize) -> Vec<Sample<Q>> {
552        let rx = formula.subscribe().await.unwrap();
553
554        BroadcastStream::new(rx)
555            .take(num_values)
556            .map(|x| x.unwrap())
557            .collect()
558            .await
559    }
560
561    #[track_caller]
562    fn check_samples<Q: Quantity>(
563        samples: Vec<Sample<Q>>,
564        extractor: impl Fn(Q) -> f32,
565        expected_interval: TimeDelta,
566        expected_values: Vec<Option<f32>>,
567    ) {
568        let values = samples
569            .iter()
570            .map(|res| res.value().map(&extractor))
571            .collect::<Vec<_>>();
572
573        samples.as_slice().windows(2).for_each(|w| {
574            assert_eq!(
575                w[1].timestamp() - w[0].timestamp(),
576                expected_interval,
577                "Samples are not spaced at the expected interval"
578            );
579        });
580
581        for (id, (v, ev)) in values.iter().zip(expected_values.iter()).enumerate() {
582            match (v, ev) {
583                (Some(v), Some(ev)) => assert!(
584                    (v - ev).abs() < 0.01,
585                    "Item {id} - expected value {ev:?}, got value {v:?}"
586                ),
587                (None, None) => {}
588                _ => panic!("Item {id} - expected value {ev:?}, got value {v:?}"),
589            }
590        }
591    }
592}