Skip to main content

frequenz_microgrid/logical_meter/
logical_meter_handle.rs

1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4use crate::logical_meter::formula::Formula;
5use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider;
6use crate::{
7    client::MicrogridClientHandle,
8    error::Error,
9    metric,
10    proto::common::microgrid::electrical_components::{
11        ElectricalComponent, ElectricalComponentConnection,
12    },
13};
14use frequenz_microgrid_component_graph::{self, ComponentGraph};
15use std::collections::BTreeSet;
16use tokio::sync::mpsc;
17
18use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor};
19
20/// This provides an interface  stream high-level metrics from a microgrid.
21#[derive(Clone)]
22pub struct LogicalMeterHandle {
23    instructions_tx: mpsc::Sender<super::logical_meter_actor::Instruction>,
24    graph: ComponentGraph<ElectricalComponent, ElectricalComponentConnection>,
25}
26
27impl LogicalMeterHandle {
28    /// Creates a new LogicalMeter instance.
29    pub async fn try_new(
30        client: MicrogridClientHandle,
31        config: LogicalMeterConfig,
32    ) -> Result<Self, Error> {
33        let (sender, receiver) = mpsc::channel(8);
34        let graph = ComponentGraph::try_new(
35            client.list_electrical_components(vec![], vec![]).await?,
36            client
37                .list_electrical_component_connections(vec![], vec![])
38                .await?,
39            frequenz_microgrid_component_graph::ComponentGraphConfig {
40                allow_component_validation_failures: true,
41                allow_unconnected_components: true,
42                allow_unspecified_inverters: false,
43                disable_fallback_components: false,
44            },
45        )
46        .map_err(|e| {
47            Error::component_graph_error(format!("Unable to create a component graph: {e}"))
48        })?;
49
50        let logical_meter = LogicalMeterActor::try_new(receiver, client, config)?;
51
52        tokio::task::spawn(async move {
53            logical_meter.run().await;
54        });
55
56        Ok(Self {
57            instructions_tx: sender,
58            graph,
59        })
60    }
61
62    /// Returns a receiver that streams samples for the given `metric` at the grid
63    /// connection point.
64    pub fn grid<M: metric::Metric>(
65        &mut self,
66        metric: M,
67    ) -> Result<Formula<M::QuantityType>, Error> {
68        Ok(Formula::Subscriber(Box::new(M::FormulaType::grid(
69            &self.graph,
70            metric,
71            self.instructions_tx.clone(),
72        )?)))
73    }
74
75    /// Returns a receiver that streams samples for the given `metric` for the
76    /// given battery IDs.
77    ///
78    /// When `component_ids` is `None`, all batteries in the microgrid are used.
79    pub fn battery<M: metric::Metric>(
80        &mut self,
81        component_ids: Option<BTreeSet<u64>>,
82        metric: M,
83    ) -> Result<Formula<M::QuantityType>, Error> {
84        Ok(Formula::Subscriber(Box::new(M::FormulaType::battery(
85            &self.graph,
86            metric,
87            self.instructions_tx.clone(),
88            component_ids,
89        )?)))
90    }
91
92    /// Returns a receiver that streams samples for the given `metric` for the
93    /// given CHP IDs.
94    ///
95    /// When `component_ids` is `None`, all CHPs in the microgrid are used.
96    pub fn chp<M: metric::Metric>(
97        &mut self,
98        component_ids: Option<BTreeSet<u64>>,
99        metric: M,
100    ) -> Result<Formula<M::QuantityType>, Error> {
101        Ok(Formula::Subscriber(Box::new(M::FormulaType::chp(
102            &self.graph,
103            metric,
104            self.instructions_tx.clone(),
105            component_ids,
106        )?)))
107    }
108
109    /// Returns a receiver that streams samples for the given `metric` for the
110    /// given PV IDs.
111    ///
112    /// When `component_ids` is `None`, all PVs in the microgrid are used.
113    pub fn pv<M: metric::Metric>(
114        &mut self,
115        component_ids: Option<BTreeSet<u64>>,
116        metric: M,
117    ) -> Result<Formula<M::QuantityType>, Error> {
118        Ok(Formula::Subscriber(Box::new(M::FormulaType::pv(
119            &self.graph,
120            metric,
121            self.instructions_tx.clone(),
122            component_ids,
123        )?)))
124    }
125
126    /// Returns a receiver that streams samples for the given `metric` for the
127    /// given EV charger IDs.
128    ///
129    /// When `component_ids` is `None`, all EV chargers in the microgrid are
130    /// used.
131    pub fn ev_charger<M: metric::Metric>(
132        &mut self,
133        component_ids: Option<BTreeSet<u64>>,
134        metric: M,
135    ) -> Result<Formula<M::QuantityType>, Error> {
136        Ok(Formula::Subscriber(Box::new(M::FormulaType::ev_charger(
137            &self.graph,
138            metric,
139            self.instructions_tx.clone(),
140            component_ids,
141        )?)))
142    }
143
144    /// Returns a receiver that streams samples for the given `metric` for the
145    /// logical `consumer` in the microgrid.
146    pub fn consumer<M: metric::Metric>(
147        &mut self,
148        metric: M,
149    ) -> Result<Formula<M::QuantityType>, Error> {
150        Ok(Formula::Subscriber(Box::new(M::FormulaType::consumer(
151            &self.graph,
152            metric,
153            self.instructions_tx.clone(),
154        )?)))
155    }
156
157    /// Returns a receiver that streams samples for the given `metric` for the
158    /// logical `producer` in the microgrid.
159    pub fn producer<M: metric::Metric>(
160        &mut self,
161        metric: M,
162    ) -> Result<Formula<M::QuantityType>, Error> {
163        Ok(Formula::Subscriber(Box::new(M::FormulaType::producer(
164            &self.graph,
165            metric,
166            self.instructions_tx.clone(),
167        )?)))
168    }
169
170    /// Returns a receiver that streams samples for the given `metric` for the
171    /// given component ID.
172    pub fn component<M: metric::Metric>(
173        &mut self,
174        component_id: u64,
175        metric: M,
176    ) -> Result<Formula<M::QuantityType>, Error> {
177        Ok(Formula::Subscriber(Box::new(M::FormulaType::component(
178            &self.graph,
179            metric,
180            self.instructions_tx.clone(),
181            component_id,
182        )?)))
183    }
184}
185
186#[cfg(test)]
187mod tests {
188    use chrono::TimeDelta;
189    use tokio_stream::{StreamExt, wrappers::BroadcastStream};
190
191    use crate::{
192        LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, Sample,
193        client::test_utils::{
194            MockComponent,
195            MockMicrogridApiClient, //
196        },
197        logical_meter::formula::Formula,
198        quantity::Quantity,
199    };
200
201    async fn new_logical_meter_handle() -> LogicalMeterHandle {
202        let api_client = MockMicrogridApiClient::new(
203            // Grid connection point
204            MockComponent::grid(1).with_children(vec![
205                // Main meter
206                MockComponent::meter(2)
207                    .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
208                    .with_current(vec![1.0, 1.5, 2.0, 2.5, 2.0, 1.5])
209                    .with_children(vec![
210                        // PV meter
211                        MockComponent::meter(3)
212                            .with_reactive_power(vec![-2.0, -5.0, -4.0, 1.0, 3.0, 4.0])
213                            .with_children(vec![
214                                // PV inverter
215                                MockComponent::pv_inverter(4),
216                            ]),
217                        // Battery meter
218                        MockComponent::meter(5).with_children(vec![
219                            // Battery inverter
220                            MockComponent::battery_inverter(6)
221                                .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
222                                .with_children(vec![
223                                    // Battery
224                                    MockComponent::battery(7),
225                                ]),
226                            // Battery inverter
227                            MockComponent::battery_inverter(8)
228                                .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
229                                .with_children(vec![
230                                    // Battery
231                                    MockComponent::battery(9),
232                                ]),
233                        ]),
234                        // Consumer meter
235                        MockComponent::meter(10)
236                            .with_current(vec![14.5, 15.0, 16.0, 15.5, 14.0, 13.5]),
237                        // Chp meter
238                        MockComponent::meter(11).with_children(vec![
239                            // Chp
240                            MockComponent::chp(12),
241                        ]),
242                        // Ev charger meter
243                        MockComponent::meter(13).with_children(vec![
244                            // Ev chargers
245                            MockComponent::ev_charger(14),
246                            MockComponent::ev_charger(15),
247                        ]),
248                    ]),
249            ]),
250        );
251
252        LogicalMeterHandle::try_new(
253            MicrogridClientHandle::new_from_client(api_client),
254            LogicalMeterConfig {
255                resampling_interval: TimeDelta::try_seconds(1).unwrap(),
256            },
257        )
258        .await
259        .unwrap()
260    }
261
262    #[tokio::test]
263    async fn test_formula_display() {
264        let mut lm = new_logical_meter_handle().await;
265
266        let formula = lm.grid(crate::metric::AcPowerActive).unwrap();
267        assert_eq!(formula.to_string(), "METRIC_AC_POWER_ACTIVE::(#2)");
268
269        let formula = lm.battery(None, crate::metric::AcPowerReactive).unwrap();
270        assert_eq!(
271            formula.to_string(),
272            "METRIC_AC_POWER_REACTIVE::(COALESCE(#8 + #6, #5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)))"
273        );
274
275        let formula = lm
276            .battery(Some([9].into()), crate::metric::AcPowerActive)
277            .unwrap();
278        assert_eq!(
279            formula.to_string(),
280            "METRIC_AC_POWER_ACTIVE::(COALESCE(#8, 0.0))"
281        );
282
283        let formula = lm
284            .battery(Some([7].into()), crate::metric::AcVoltage)
285            .unwrap();
286        assert_eq!(formula.to_string(), "METRIC_AC_VOLTAGE::(COALESCE(#5, #6))");
287
288        let formula = lm.battery(None, crate::metric::AcFrequency).unwrap();
289        assert_eq!(
290            formula.to_string(),
291            "METRIC_AC_FREQUENCY::(COALESCE(#5, #6, #8))"
292        );
293
294        let formula = lm.pv(None, crate::metric::AcPowerReactive).unwrap();
295        assert_eq!(
296            formula.to_string(),
297            "METRIC_AC_POWER_REACTIVE::(COALESCE(#4, #3, 0.0))"
298        );
299
300        let formula = lm.chp(None, crate::metric::AcPowerActive).unwrap();
301        assert_eq!(
302            formula.to_string(),
303            "METRIC_AC_POWER_ACTIVE::(COALESCE(#12, #11, 0.0))"
304        );
305
306        let formula = lm.ev_charger(None, crate::metric::AcCurrent).unwrap();
307        assert_eq!(
308            formula.to_string(),
309            "METRIC_AC_CURRENT::(COALESCE(#15 + #14, #13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)))"
310        );
311
312        let formula = lm.consumer(crate::metric::AcCurrent).unwrap();
313        assert_eq!(
314            formula.to_string(),
315            concat!(
316                "METRIC_AC_CURRENT::(MAX(",
317                "#2 - COALESCE(#3, #4, 0.0) - COALESCE(#5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)) ",
318                "- #10 - COALESCE(#11, #12, 0.0)",
319                " - COALESCE(#13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)),",
320                " 0.0)",
321                " + COALESCE(MAX(#3 - #4, 0.0), 0.0) + COALESCE(MAX(#5 - #6 - #8, 0.0), 0.0)",
322                " + MAX(#10, 0.0) + COALESCE(MAX(#11 - #12, 0.0), 0.0)",
323                " + COALESCE(MAX(#13 - #14 - #15, 0.0), 0.0)",
324                ")"
325            )
326        );
327
328        let formula = lm.producer(crate::metric::AcPowerActive).unwrap();
329        assert_eq!(
330            formula.to_string(),
331            concat!(
332                "METRIC_AC_POWER_ACTIVE::(",
333                "MIN(COALESCE(#4, #3, 0.0), 0.0)",
334                " + MIN(COALESCE(#12, #11, 0.0), 0.0)",
335                ")"
336            )
337        );
338
339        let formula = lm.component(10, crate::metric::AcCurrent).unwrap();
340        assert_eq!(formula.to_string(), "METRIC_AC_CURRENT::(#10)");
341    }
342
343    #[tokio::test(start_paused = true)]
344    async fn test_grid_power_formula() {
345        let formula = new_logical_meter_handle()
346            .await
347            .grid(crate::metric::AcPowerActive)
348            .unwrap();
349
350        let samples = fetch_samples(formula, 10).await;
351
352        check_samples(
353            samples,
354            |q| q.as_watts(),
355            vec![
356                Some(5.8),
357                Some(6.0),
358                Some(6.0),
359                Some(7.0),
360                Some(5.8),
361                Some(6.0),
362                Some(6.0),
363                Some(7.0),
364                Some(5.8),
365                Some(6.0),
366            ],
367        )
368    }
369
370    #[tokio::test(start_paused = true)]
371    async fn test_pv_reactive_power_formula() {
372        let formula = new_logical_meter_handle()
373            .await
374            .pv(None, crate::metric::AcPowerReactive)
375            .unwrap();
376
377        let samples = fetch_samples(formula, 10).await;
378
379        check_samples(
380            samples,
381            |q| q.as_volt_amperes_reactive(),
382            vec![
383                Some(-1.4),
384                Some(-0.5),
385                Some(-0.5),
386                Some(4.0),
387                Some(-1.4),
388                Some(-0.5),
389                Some(-0.5),
390                Some(4.0),
391                Some(-1.4),
392                Some(-0.5),
393            ],
394        )
395    }
396
397    #[tokio::test(start_paused = true)]
398    async fn test_battery_voltage_formula() {
399        let formula = new_logical_meter_handle()
400            .await
401            .battery(None, crate::metric::AcVoltage)
402            .unwrap();
403
404        let samples = fetch_samples(formula, 10).await;
405        check_samples(
406            samples,
407            |q| q.as_volts(),
408            vec![
409                Some(398.0),
410                Some(397.67),
411                Some(397.67),
412                Some(396.0),
413                Some(398.0),
414                Some(397.67),
415                Some(397.67),
416                Some(396.0),
417                Some(398.0),
418                Some(397.67),
419            ],
420        )
421    }
422
423    #[tokio::test(start_paused = true)]
424    async fn test_consumer_current_formula() {
425        let formula = new_logical_meter_handle()
426            .await
427            .consumer(crate::metric::AcCurrent)
428            .unwrap();
429
430        let samples = fetch_samples(formula, 10).await;
431        check_samples(
432            samples,
433            |q| q.as_amperes(),
434            vec![
435                Some(15.0),
436                Some(14.75),
437                Some(14.75),
438                Some(13.5),
439                Some(15.0),
440                Some(14.75),
441                Some(14.75),
442                Some(13.5),
443                Some(15.0),
444                Some(14.75),
445            ],
446        )
447    }
448
449    async fn fetch_samples<Q: Quantity>(formula: Formula<Q>, num_values: usize) -> Vec<Sample<Q>> {
450        let rx = formula.subscribe().await.unwrap();
451
452        BroadcastStream::new(rx)
453            .take(num_values)
454            .map(|x| x.unwrap())
455            .collect()
456            .await
457    }
458
459    #[track_caller]
460    fn check_samples<Q: Quantity>(
461        samples: Vec<Sample<Q>>,
462        extractor: impl Fn(Q) -> f32,
463        expected_values: Vec<Option<f32>>,
464    ) {
465        let values = samples
466            .iter()
467            .map(|res| res.value().map(|v| extractor(v)))
468            .collect::<Vec<_>>();
469
470        let one_second = TimeDelta::try_seconds(1).unwrap();
471
472        samples.as_slice().windows(2).for_each(|w| {
473            assert_eq!(w[1].timestamp() - w[0].timestamp(), one_second);
474        });
475
476        for (v, ev) in values.iter().zip(expected_values.iter()) {
477            match (v, ev) {
478                (Some(v), Some(ev)) => assert!(
479                    (v - ev).abs() < 0.01,
480                    "expected value {ev:?}, got value {v:?}"
481                ),
482                (None, None) => {}
483                _ => panic!("expected value {ev:?}, got value {v:?}"),
484            }
485        }
486    }
487}