frequenz_microgrid/logical_meter/
logical_meter_handle.rs

1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4use crate::logical_meter::formula::Formula;
5use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider;
6use crate::{
7    client::MicrogridClientHandle,
8    error::Error,
9    proto::common::microgrid::electrical_components::{
10        ElectricalComponent, ElectricalComponentConnection,
11    },
12};
13use frequenz_microgrid_component_graph::{self, ComponentGraph};
14use std::collections::BTreeSet;
15use tokio::sync::mpsc;
16
17use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor};
18
19/// This provides an interface  stream high-level metrics from a microgrid.
20#[derive(Clone)]
21pub struct LogicalMeterHandle {
22    instructions_tx: mpsc::Sender<super::logical_meter_actor::Instruction>,
23    graph: ComponentGraph<ElectricalComponent, ElectricalComponentConnection>,
24}
25
26impl LogicalMeterHandle {
27    /// Creates a new LogicalMeter instance.
28    pub async fn try_new(
29        client: MicrogridClientHandle,
30        config: LogicalMeterConfig,
31    ) -> Result<Self, Error> {
32        let (sender, receiver) = mpsc::channel(8);
33        let graph = ComponentGraph::try_new(
34            client.list_electrical_components(vec![], vec![]).await?,
35            client
36                .list_electrical_component_connections(vec![], vec![])
37                .await?,
38            frequenz_microgrid_component_graph::ComponentGraphConfig {
39                allow_component_validation_failures: true,
40                allow_unconnected_components: true,
41                allow_unspecified_inverters: false,
42                disable_fallback_components: false,
43            },
44        )
45        .map_err(|e| {
46            Error::component_graph_error(format!("Unable to create a component graph: {e}"))
47        })?;
48
49        let logical_meter = LogicalMeterActor::try_new(receiver, client, config)?;
50
51        tokio::task::spawn(async move {
52            logical_meter.run().await;
53        });
54
55        Ok(Self {
56            instructions_tx: sender,
57            graph,
58        })
59    }
60
61    /// Returns a receiver that streams samples for the given `metric` at the grid
62    /// connection point.
63    pub fn grid<M: super::metric::Metric>(
64        &mut self,
65        metric: M,
66    ) -> Result<Formula<M::QuantityType>, Error> {
67        Ok(Formula::Subscriber(Box::new(M::FormulaType::grid(
68            &self.graph,
69            metric,
70            self.instructions_tx.clone(),
71        )?)))
72    }
73
74    /// Returns a receiver that streams samples for the given `metric` for the
75    /// given battery IDs.
76    ///
77    /// When `component_ids` is `None`, all batteries in the microgrid are used.
78    pub fn battery<M: super::metric::Metric>(
79        &mut self,
80        component_ids: Option<BTreeSet<u64>>,
81        metric: M,
82    ) -> Result<Formula<M::QuantityType>, Error> {
83        Ok(Formula::Subscriber(Box::new(M::FormulaType::battery(
84            &self.graph,
85            metric,
86            self.instructions_tx.clone(),
87            component_ids,
88        )?)))
89    }
90
91    /// Returns a receiver that streams samples for the given `metric` for the
92    /// given CHP IDs.
93    ///
94    /// When `component_ids` is `None`, all CHPs in the microgrid are used.
95    pub fn chp<M: super::metric::Metric>(
96        &mut self,
97        component_ids: Option<BTreeSet<u64>>,
98        metric: M,
99    ) -> Result<Formula<M::QuantityType>, Error> {
100        Ok(Formula::Subscriber(Box::new(M::FormulaType::chp(
101            &self.graph,
102            metric,
103            self.instructions_tx.clone(),
104            component_ids,
105        )?)))
106    }
107
108    /// Returns a receiver that streams samples for the given `metric` for the
109    /// given PV IDs.
110    ///
111    /// When `component_ids` is `None`, all PVs in the microgrid are used.
112    pub fn pv<M: super::metric::Metric>(
113        &mut self,
114        component_ids: Option<BTreeSet<u64>>,
115        metric: M,
116    ) -> Result<Formula<M::QuantityType>, Error> {
117        Ok(Formula::Subscriber(Box::new(M::FormulaType::pv(
118            &self.graph,
119            metric,
120            self.instructions_tx.clone(),
121            component_ids,
122        )?)))
123    }
124
125    /// Returns a receiver that streams samples for the given `metric` for the
126    /// given EV charger IDs.
127    ///
128    /// When `component_ids` is `None`, all EV chargers in the microgrid are
129    /// used.
130    pub fn ev_charger<M: super::metric::Metric>(
131        &mut self,
132        component_ids: Option<BTreeSet<u64>>,
133        metric: M,
134    ) -> Result<Formula<M::QuantityType>, Error> {
135        Ok(Formula::Subscriber(Box::new(M::FormulaType::ev_charger(
136            &self.graph,
137            metric,
138            self.instructions_tx.clone(),
139            component_ids,
140        )?)))
141    }
142
143    /// Returns a receiver that streams samples for the given `metric` for the
144    /// logical `consumer` in the microgrid.
145    pub fn consumer<M: super::metric::Metric>(
146        &mut self,
147        metric: M,
148    ) -> Result<Formula<M::QuantityType>, Error> {
149        Ok(Formula::Subscriber(Box::new(M::FormulaType::consumer(
150            &self.graph,
151            metric,
152            self.instructions_tx.clone(),
153        )?)))
154    }
155
156    /// Returns a receiver that streams samples for the given `metric` for the
157    /// logical `producer` in the microgrid.
158    pub fn producer<M: super::metric::Metric>(
159        &mut self,
160        metric: M,
161    ) -> Result<Formula<M::QuantityType>, Error> {
162        Ok(Formula::Subscriber(Box::new(M::FormulaType::producer(
163            &self.graph,
164            metric,
165            self.instructions_tx.clone(),
166        )?)))
167    }
168
169    /// Returns a receiver that streams samples for the given `metric` for the
170    /// given component ID.
171    pub fn component<M: super::metric::Metric>(
172        &mut self,
173        component_id: u64,
174        metric: M,
175    ) -> Result<Formula<M::QuantityType>, Error> {
176        Ok(Formula::Subscriber(Box::new(M::FormulaType::component(
177            &self.graph,
178            metric,
179            self.instructions_tx.clone(),
180            component_id,
181        )?)))
182    }
183}
184
185#[cfg(test)]
186mod tests {
187    use chrono::TimeDelta;
188    use tokio_stream::{StreamExt, wrappers::BroadcastStream};
189
190    use crate::{
191        LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, Sample,
192        client::test_utils::{
193            MockComponent,
194            MockMicrogridApiClient, //
195        },
196        logical_meter::formula::Formula,
197        quantity::Quantity,
198    };
199
200    async fn new_logical_meter_handle() -> LogicalMeterHandle {
201        let api_client = MockMicrogridApiClient::new(
202            // Grid connection point
203            MockComponent::grid(1).with_children(vec![
204                // Main meter
205                MockComponent::meter(2)
206                    .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
207                    .with_current(vec![1.0, 1.5, 2.0, 2.5, 2.0, 1.5])
208                    .with_children(vec![
209                        // PV meter
210                        MockComponent::meter(3)
211                            .with_reactive_power(vec![-2.0, -5.0, -4.0, 1.0, 3.0, 4.0])
212                            .with_children(vec![
213                                // PV inverter
214                                MockComponent::pv_inverter(4),
215                            ]),
216                        // Battery meter
217                        MockComponent::meter(5).with_children(vec![
218                            // Battery inverter
219                            MockComponent::battery_inverter(6)
220                                .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
221                                .with_children(vec![
222                                    // Battery
223                                    MockComponent::battery(7),
224                                ]),
225                            // Battery inverter
226                            MockComponent::battery_inverter(8)
227                                .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
228                                .with_children(vec![
229                                    // Battery
230                                    MockComponent::battery(9),
231                                ]),
232                        ]),
233                        // Consumer meter
234                        MockComponent::meter(10)
235                            .with_current(vec![14.5, 15.0, 16.0, 15.5, 14.0, 13.5]),
236                        // Chp meter
237                        MockComponent::meter(11).with_children(vec![
238                            // Chp
239                            MockComponent::chp(12),
240                        ]),
241                        // Ev charger meter
242                        MockComponent::meter(13).with_children(vec![
243                            // Ev chargers
244                            MockComponent::ev_charger(14),
245                            MockComponent::ev_charger(15),
246                        ]),
247                    ]),
248            ]),
249        );
250
251        LogicalMeterHandle::try_new(
252            MicrogridClientHandle::new_from_client(api_client),
253            LogicalMeterConfig {
254                resampling_interval: TimeDelta::try_seconds(1).unwrap(),
255            },
256        )
257        .await
258        .unwrap()
259    }
260
261    #[tokio::test]
262    async fn test_formula_display() {
263        let mut lm = new_logical_meter_handle().await;
264
265        let formula = lm.grid(crate::metric::AcPowerActive).unwrap();
266        assert_eq!(formula.to_string(), "METRIC_AC_POWER_ACTIVE::(#2)");
267
268        let formula = lm.battery(None, crate::metric::AcPowerReactive).unwrap();
269        assert_eq!(
270            formula.to_string(),
271            "METRIC_AC_POWER_REACTIVE::(COALESCE(#8 + #6, #5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)))"
272        );
273
274        let formula = lm
275            .battery(Some([9].into()), crate::metric::AcPowerActive)
276            .unwrap();
277        assert_eq!(
278            formula.to_string(),
279            "METRIC_AC_POWER_ACTIVE::(COALESCE(#8, 0.0))"
280        );
281
282        let formula = lm
283            .battery(Some([7].into()), crate::metric::AcVoltage)
284            .unwrap();
285        assert_eq!(formula.to_string(), "METRIC_AC_VOLTAGE::(COALESCE(#5, #6))");
286
287        let formula = lm.battery(None, crate::metric::AcFrequency).unwrap();
288        assert_eq!(
289            formula.to_string(),
290            "METRIC_AC_FREQUENCY::(COALESCE(#5, #6, #8))"
291        );
292
293        let formula = lm.pv(None, crate::metric::AcPowerReactive).unwrap();
294        assert_eq!(
295            formula.to_string(),
296            "METRIC_AC_POWER_REACTIVE::(COALESCE(#4, #3, 0.0))"
297        );
298
299        let formula = lm.chp(None, crate::metric::AcPowerActive).unwrap();
300        assert_eq!(
301            formula.to_string(),
302            "METRIC_AC_POWER_ACTIVE::(COALESCE(#12, #11, 0.0))"
303        );
304
305        let formula = lm.ev_charger(None, crate::metric::AcCurrent).unwrap();
306        assert_eq!(
307            formula.to_string(),
308            "METRIC_AC_CURRENT::(COALESCE(#15 + #14, #13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)))"
309        );
310
311        let formula = lm.consumer(crate::metric::AcCurrent).unwrap();
312        assert_eq!(
313            formula.to_string(),
314            concat!(
315                "METRIC_AC_CURRENT::(MAX(",
316                "#2 - COALESCE(#3, #4, 0.0) - COALESCE(#5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)) ",
317                "- #10 - COALESCE(#11, #12, 0.0)",
318                " - COALESCE(#13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)),",
319                " 0.0)",
320                " + COALESCE(MAX(#3 - #4, 0.0), 0.0) + COALESCE(MAX(#5 - #6 - #8, 0.0), 0.0)",
321                " + MAX(#10, 0.0) + COALESCE(MAX(#11 - #12, 0.0), 0.0)",
322                " + COALESCE(MAX(#13 - #14 - #15, 0.0), 0.0)",
323                ")"
324            )
325        );
326
327        let formula = lm.producer(crate::metric::AcPowerActive).unwrap();
328        assert_eq!(
329            formula.to_string(),
330            concat!(
331                "METRIC_AC_POWER_ACTIVE::(",
332                "MIN(COALESCE(#4, #3, 0.0), 0.0)",
333                " + MIN(COALESCE(#12, #11, 0.0), 0.0)",
334                ")"
335            )
336        );
337
338        let formula = lm.component(10, crate::metric::AcCurrent).unwrap();
339        assert_eq!(formula.to_string(), "METRIC_AC_CURRENT::(#10)");
340    }
341
342    #[tokio::test(start_paused = true)]
343    async fn test_grid_power_formula() {
344        let formula = new_logical_meter_handle()
345            .await
346            .grid(crate::metric::AcPowerActive)
347            .unwrap();
348
349        let samples = fetch_samples(formula, 10).await;
350
351        check_samples(
352            samples,
353            |q| q.as_watts(),
354            vec![
355                Some(5.8),
356                Some(6.0),
357                Some(6.0),
358                Some(7.0),
359                Some(5.8),
360                Some(6.0),
361                Some(6.0),
362                Some(7.0),
363                Some(5.8),
364                Some(6.0),
365            ],
366        )
367    }
368
369    #[tokio::test(start_paused = true)]
370    async fn test_pv_reactive_power_formula() {
371        let formula = new_logical_meter_handle()
372            .await
373            .pv(None, crate::metric::AcPowerReactive)
374            .unwrap();
375
376        let samples = fetch_samples(formula, 10).await;
377
378        check_samples(
379            samples,
380            |q| q.as_volt_amperes_reactive(),
381            vec![
382                Some(-1.4),
383                Some(-0.5),
384                Some(-0.5),
385                Some(4.0),
386                Some(-1.4),
387                Some(-0.5),
388                Some(-0.5),
389                Some(4.0),
390                Some(-1.4),
391                Some(-0.5),
392            ],
393        )
394    }
395
396    #[tokio::test(start_paused = true)]
397    async fn test_battery_voltage_formula() {
398        let formula = new_logical_meter_handle()
399            .await
400            .battery(None, crate::metric::AcVoltage)
401            .unwrap();
402
403        let samples = fetch_samples(formula, 10).await;
404        check_samples(
405            samples,
406            |q| q.as_volts(),
407            vec![
408                Some(398.0),
409                Some(397.67),
410                Some(397.67),
411                Some(396.0),
412                Some(398.0),
413                Some(397.67),
414                Some(397.67),
415                Some(396.0),
416                Some(398.0),
417                Some(397.67),
418            ],
419        )
420    }
421
422    #[tokio::test(start_paused = true)]
423    async fn test_consumer_current_formula() {
424        let formula = new_logical_meter_handle()
425            .await
426            .consumer(crate::metric::AcCurrent)
427            .unwrap();
428
429        let samples = fetch_samples(formula, 10).await;
430        check_samples(
431            samples,
432            |q| q.as_amperes(),
433            vec![
434                Some(15.0),
435                Some(14.75),
436                Some(14.75),
437                Some(13.5),
438                Some(15.0),
439                Some(14.75),
440                Some(14.75),
441                Some(13.5),
442                Some(15.0),
443                Some(14.75),
444            ],
445        )
446    }
447
448    async fn fetch_samples<Q: Quantity>(formula: Formula<Q>, num_values: usize) -> Vec<Sample<Q>> {
449        let rx = formula.subscribe().await.unwrap();
450
451        BroadcastStream::new(rx)
452            .take(num_values)
453            .map(|x| x.unwrap())
454            .collect()
455            .await
456    }
457
458    #[track_caller]
459    fn check_samples<Q: Quantity>(
460        samples: Vec<Sample<Q>>,
461        extractor: impl Fn(Q) -> f32,
462        expected_values: Vec<Option<f32>>,
463    ) {
464        let values = samples
465            .iter()
466            .map(|res| res.value().map(|v| extractor(v)))
467            .collect::<Vec<_>>();
468
469        let one_second = TimeDelta::try_seconds(1).unwrap();
470
471        samples.as_slice().windows(2).for_each(|w| {
472            assert_eq!(w[1].timestamp() - w[0].timestamp(), one_second);
473        });
474
475        for (v, ev) in values.iter().zip(expected_values.iter()) {
476            match (v, ev) {
477                (Some(v), Some(ev)) => assert!(
478                    (v - ev).abs() < 0.01,
479                    "expected value {ev:?}, got value {v:?}"
480                ),
481                (None, None) => {}
482                _ => panic!("expected value {ev:?}, got value {v:?}"),
483            }
484        }
485    }
486}