Skip to main content

frequenz_microgrid/logical_meter/
logical_meter_handle.rs

1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4use crate::logical_meter::formula::Formula;
5use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider;
6use crate::{
7    client::MicrogridClientHandle,
8    client::proto::common::microgrid::electrical_components::{
9        ElectricalComponent, ElectricalComponentConnection,
10    },
11    error::Error,
12    metric,
13};
14use frequenz_microgrid_component_graph::{self, ComponentGraph, ComponentGraphConfig};
15use std::collections::BTreeSet;
16use std::time::Duration;
17use tokio::sync::mpsc;
18
19use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor};
20
21/// This provides an interface  stream high-level metrics from a microgrid.
22#[derive(Clone)]
23pub struct LogicalMeterHandle {
24    instructions_tx: mpsc::Sender<super::logical_meter_actor::Instruction>,
25    graph: ComponentGraph<ElectricalComponent, ElectricalComponentConnection>,
26}
27
28impl LogicalMeterHandle {
29    /// Creates a new LogicalMeter instance.
30    ///
31    /// Listing the components and connections from the API and building the
32    /// component graph is retried indefinitely with a 3 second backoff, so
33    /// this call blocks until the server is reachable and returns data that
34    /// forms a valid graph.  Returns an error only if `config` is invalid.
35    pub async fn try_new(
36        client: MicrogridClientHandle,
37        config: LogicalMeterConfig,
38    ) -> Result<Self, Error> {
39        Self::try_new_with_clock(client, config, crate::wall_clock_timer::SystemClock).await
40    }
41
42    pub(crate) async fn try_new_with_clock<C: crate::wall_clock_timer::Clock + 'static>(
43        client: MicrogridClientHandle,
44        config: LogicalMeterConfig,
45        clock: C,
46    ) -> Result<Self, Error> {
47        let (sender, receiver) = mpsc::channel(8);
48        const RETRY_DELAY: Duration = Duration::from_secs(3);
49        let graph = loop {
50            match build_component_graph(&client, &config.component_graph_config).await {
51                Ok(g) => break g,
52                Err(reason) => {
53                    tracing::warn!(
54                        "Microgrid logical-meter setup failed, retrying in {:?}: {reason}",
55                        RETRY_DELAY
56                    );
57                    tokio::time::sleep(RETRY_DELAY).await;
58                }
59            }
60        };
61
62        let logical_meter = LogicalMeterActor::try_new(receiver, client, config, clock)?;
63
64        tokio::task::spawn(async move {
65            logical_meter.run().await;
66        });
67
68        Ok(Self {
69            instructions_tx: sender,
70            graph,
71        })
72    }
73
74    /// Returns a receiver that streams samples for the given `metric` at the grid
75    /// connection point.
76    pub fn grid<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
77        Ok(Formula::Subscriber(Box::new(M::FormulaType::grid(
78            &self.graph,
79            self.instructions_tx.clone(),
80        )?)))
81    }
82
83    /// Returns a receiver that streams samples for the given `metric` for the
84    /// given battery IDs.
85    ///
86    /// When `component_ids` is `None`, all batteries in the microgrid are used.
87    pub fn battery<M: metric::Metric>(
88        &self,
89        component_ids: Option<BTreeSet<u64>>,
90    ) -> Result<Formula<M::QuantityType>, Error> {
91        Ok(Formula::Subscriber(Box::new(M::FormulaType::battery(
92            &self.graph,
93            self.instructions_tx.clone(),
94            component_ids,
95        )?)))
96    }
97
98    /// Returns a receiver that streams samples for the given `metric` for the
99    /// given CHP IDs.
100    ///
101    /// When `component_ids` is `None`, all CHPs in the microgrid are used.
102    pub fn chp<M: metric::Metric>(
103        &self,
104        component_ids: Option<BTreeSet<u64>>,
105    ) -> Result<Formula<M::QuantityType>, Error> {
106        Ok(Formula::Subscriber(Box::new(M::FormulaType::chp(
107            &self.graph,
108            self.instructions_tx.clone(),
109            component_ids,
110        )?)))
111    }
112
113    /// Returns a receiver that streams samples for the given `metric` for the
114    /// given PV IDs.
115    ///
116    /// When `component_ids` is `None`, all PVs in the microgrid are used.
117    pub fn pv<M: metric::Metric>(
118        &self,
119        component_ids: Option<BTreeSet<u64>>,
120    ) -> Result<Formula<M::QuantityType>, Error> {
121        Ok(Formula::Subscriber(Box::new(M::FormulaType::pv(
122            &self.graph,
123            self.instructions_tx.clone(),
124            component_ids,
125        )?)))
126    }
127
128    /// Returns a receiver that streams samples for the given `metric` for the
129    /// given EV charger IDs.
130    ///
131    /// When `component_ids` is `None`, all EV chargers in the microgrid are
132    /// used.
133    pub fn ev_charger<M: metric::Metric>(
134        &self,
135        component_ids: Option<BTreeSet<u64>>,
136    ) -> Result<Formula<M::QuantityType>, Error> {
137        Ok(Formula::Subscriber(Box::new(M::FormulaType::ev_charger(
138            &self.graph,
139            self.instructions_tx.clone(),
140            component_ids,
141        )?)))
142    }
143
144    /// Returns a receiver that streams samples for the given `metric` for the
145    /// logical `consumer` in the microgrid.
146    pub fn consumer<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
147        Ok(Formula::Subscriber(Box::new(M::FormulaType::consumer(
148            &self.graph,
149            self.instructions_tx.clone(),
150        )?)))
151    }
152
153    /// Returns a receiver that streams samples for the given `metric` for the
154    /// logical `producer` in the microgrid.
155    pub fn producer<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
156        Ok(Formula::Subscriber(Box::new(M::FormulaType::producer(
157            &self.graph,
158            self.instructions_tx.clone(),
159        )?)))
160    }
161
162    /// Returns a receiver that streams samples for the given `metric` for the
163    /// given component ID.
164    pub fn component<M: metric::Metric>(
165        &self,
166        component_id: u64,
167    ) -> Result<Formula<M::QuantityType>, Error> {
168        Ok(Formula::Subscriber(Box::new(M::FormulaType::component(
169            &self.graph,
170            self.instructions_tx.clone(),
171            component_id,
172        )?)))
173    }
174
175    /// Returns a reference to the component graph.
176    pub fn graph(&self) -> &ComponentGraph<ElectricalComponent, ElectricalComponentConnection> {
177        &self.graph
178    }
179}
180
181/// Lists the components and connections from the API and builds the
182/// component graph.  Errors from each step are stringified with a prefix so
183/// the retry loop can log a concise reason.
184async fn build_component_graph(
185    client: &MicrogridClientHandle,
186    config: &ComponentGraphConfig,
187) -> Result<ComponentGraph<ElectricalComponent, ElectricalComponentConnection>, String> {
188    let components = client
189        .list_electrical_components(vec![], vec![])
190        .await
191        .map_err(|e| format!("fetching components failed: {e}"))?;
192    let connections = client
193        .list_electrical_component_connections(vec![], vec![])
194        .await
195        .map_err(|e| format!("fetching component connections failed: {e}"))?;
196    ComponentGraph::try_new(components, connections, config.clone())
197        .map_err(|e| format!("building component graph failed: {e}"))
198}
199
200#[cfg(test)]
201mod tests {
202    use chrono::TimeDelta;
203    use frequenz_resampling::ResamplingFunction;
204    use tokio_stream::{StreamExt, wrappers::BroadcastStream};
205
206    use frequenz_microgrid_component_graph::ComponentGraphConfig;
207
208    use crate::{
209        LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, Sample,
210        client::test_utils::{
211            MockComponent,
212            MockMicrogridApiClient, //
213        },
214        logical_meter::formula::Formula,
215        quantity::Quantity,
216    };
217
218    async fn new_logical_meter_handle(config: Option<LogicalMeterConfig>) -> LogicalMeterHandle {
219        let api_client = MockMicrogridApiClient::new(
220            // Grid connection point
221            MockComponent::grid(1).with_children(vec![
222                // Main meter
223                MockComponent::meter(2)
224                    .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
225                    .with_current(vec![1.0, 1.5, 2.0, 2.5, 2.0, 1.5])
226                    .with_children(vec![
227                        // PV meter
228                        MockComponent::meter(3)
229                            .with_reactive_power(vec![-2.0, -5.0, -4.0, 1.0, 3.0, 4.0])
230                            .with_children(vec![
231                                // PV inverter
232                                MockComponent::pv_inverter(4),
233                            ]),
234                        // Battery meter
235                        MockComponent::meter(5).with_children(vec![
236                            // Battery inverter
237                            MockComponent::battery_inverter(6)
238                                .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
239                                .with_children(vec![
240                                    // Battery
241                                    MockComponent::battery(7),
242                                ]),
243                            // Battery inverter
244                            MockComponent::battery_inverter(8)
245                                .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
246                                .with_children(vec![
247                                    // Battery
248                                    MockComponent::battery(9),
249                                ]),
250                        ]),
251                        // Consumer meter
252                        MockComponent::meter(10)
253                            .with_current(vec![14.5, 15.0, 16.0, 15.5, 14.0, 13.5]),
254                        // Chp meter
255                        MockComponent::meter(11).with_children(vec![
256                            // Chp
257                            MockComponent::chp(12),
258                        ]),
259                        // Ev charger meter
260                        MockComponent::meter(13).with_children(vec![
261                            // Ev chargers
262                            MockComponent::ev_charger(14),
263                            MockComponent::ev_charger(15),
264                        ]),
265                    ]),
266            ]),
267        );
268
269        let clock = api_client.clock();
270        LogicalMeterHandle::try_new_with_clock(
271            MicrogridClientHandle::new_from_client(api_client),
272            config.unwrap_or_else(|| LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap())),
273            clock,
274        )
275        .await
276        .unwrap()
277    }
278
279    #[tokio::test]
280    async fn test_formula_display() {
281        let lm = new_logical_meter_handle(Some(
282            LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap())
283                .with_component_graph_config(
284                    ComponentGraphConfig::builder()
285                        .prefer_meters_in_component_formulas(false)
286                        .include_phantom_loads_in_consumer_formula(true)
287                        .build(),
288                ),
289        ))
290        .await;
291
292        let formula = lm.grid::<crate::metric::AcPowerActive>().unwrap();
293        assert_eq!(formula.to_string(), "METRIC_AC_POWER_ACTIVE::(#2)");
294
295        let formula = lm.battery::<crate::metric::AcPowerReactive>(None).unwrap();
296        assert_eq!(
297            formula.to_string(),
298            "METRIC_AC_POWER_REACTIVE::(COALESCE(#8 + #6, #5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)))"
299        );
300
301        let formula = lm
302            .battery::<crate::metric::AcPowerActive>(Some([9].into()))
303            .unwrap();
304        assert_eq!(
305            formula.to_string(),
306            "METRIC_AC_POWER_ACTIVE::(COALESCE(#8, 0.0))"
307        );
308
309        let formula = lm
310            .battery::<crate::metric::AcVoltage>(Some([7].into()))
311            .unwrap();
312        assert_eq!(formula.to_string(), "METRIC_AC_VOLTAGE::(COALESCE(#5, #6))");
313
314        let formula = lm.battery::<crate::metric::AcFrequency>(None).unwrap();
315        assert_eq!(
316            formula.to_string(),
317            "METRIC_AC_FREQUENCY::(COALESCE(#5, #6, #8))"
318        );
319
320        let formula = lm.pv::<crate::metric::AcPowerReactive>(None).unwrap();
321        assert_eq!(
322            formula.to_string(),
323            "METRIC_AC_POWER_REACTIVE::(COALESCE(#4, #3, 0.0))"
324        );
325
326        let formula = lm.chp::<crate::metric::AcPowerActive>(None).unwrap();
327        assert_eq!(
328            formula.to_string(),
329            "METRIC_AC_POWER_ACTIVE::(COALESCE(#12, #11, 0.0))"
330        );
331
332        let formula = lm.ev_charger::<crate::metric::AcCurrent>(None).unwrap();
333        assert_eq!(
334            formula.to_string(),
335            "METRIC_AC_CURRENT::(COALESCE(#15 + #14, #13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)))"
336        );
337
338        let formula = lm.consumer::<crate::metric::AcCurrent>().unwrap();
339        assert_eq!(
340            formula.to_string(),
341            concat!(
342                "METRIC_AC_CURRENT::(MAX(",
343                "#2 - COALESCE(#3, #4, 0.0) - COALESCE(#5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)) ",
344                "- #10 - COALESCE(#11, #12, 0.0)",
345                " - COALESCE(#13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)),",
346                " 0.0)",
347                " + COALESCE(MAX(#3 - #4, 0.0), 0.0) + COALESCE(MAX(#5 - #6 - #8, 0.0), 0.0)",
348                " + MAX(#10, 0.0) + COALESCE(MAX(#11 - #12, 0.0), 0.0)",
349                " + COALESCE(MAX(#13 - #14 - #15, 0.0), 0.0)",
350                ")"
351            )
352        );
353
354        let formula = lm.producer::<crate::metric::AcPowerActive>().unwrap();
355        assert_eq!(
356            formula.to_string(),
357            concat!(
358                "METRIC_AC_POWER_ACTIVE::(",
359                "MIN(COALESCE(#4, #3, 0.0), 0.0)",
360                " + MIN(COALESCE(#12, #11, 0.0), 0.0)",
361                ")"
362            )
363        );
364
365        let formula = lm.component::<crate::metric::AcCurrent>(10).unwrap();
366        assert_eq!(formula.to_string(), "METRIC_AC_CURRENT::(#10)");
367    }
368
369    #[tokio::test(start_paused = true)]
370    async fn test_grid_power_formula() {
371        let formula = new_logical_meter_handle(None)
372            .await
373            .grid::<crate::metric::AcPowerActive>()
374            .unwrap();
375
376        let samples = fetch_samples(formula, 10).await;
377
378        check_samples(
379            samples,
380            |q| q.as_watts(),
381            TimeDelta::try_seconds(1).unwrap(),
382            vec![
383                Some(5.8),
384                Some(6.0),
385                Some(6.0),
386                Some(7.0),
387                Some(5.8),
388                Some(6.0),
389                Some(6.0),
390                Some(7.0),
391                Some(5.8),
392                Some(6.0),
393            ],
394        )
395    }
396
397    #[tokio::test(start_paused = true)]
398    async fn test_pv_reactive_power_formula() {
399        let formula = new_logical_meter_handle(None)
400            .await
401            .pv::<crate::metric::AcPowerReactive>(None)
402            .unwrap();
403
404        let samples = fetch_samples(formula, 10).await;
405
406        check_samples(
407            samples,
408            |q| q.as_volt_amperes_reactive(),
409            TimeDelta::try_seconds(1).unwrap(),
410            vec![
411                Some(-1.4),
412                Some(-0.5),
413                Some(-0.5),
414                Some(4.0),
415                Some(-1.4),
416                Some(-0.5),
417                Some(-0.5),
418                Some(4.0),
419                Some(-1.4),
420                Some(-0.5),
421            ],
422        )
423    }
424
425    #[tokio::test(start_paused = true)]
426    async fn test_battery_voltage_formula() {
427        let formula = new_logical_meter_handle(None)
428            .await
429            .battery::<crate::metric::AcVoltage>(None)
430            .unwrap();
431
432        let samples = fetch_samples(formula, 10).await;
433        check_samples(
434            samples,
435            |q| q.as_volts(),
436            TimeDelta::try_seconds(1).unwrap(),
437            vec![
438                Some(398.0),
439                Some(397.67),
440                Some(397.67),
441                Some(396.0),
442                Some(398.0),
443                Some(397.67),
444                Some(397.67),
445                Some(396.0),
446                Some(398.0),
447                Some(397.67),
448            ],
449        )
450    }
451
452    #[tokio::test(start_paused = true)]
453    async fn test_resampling_functions() {
454        let lm_config = Some(
455            LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
456                .with_default_resampling_function(ResamplingFunction::Count)
457                .override_resampling_function::<crate::metric::AcVoltage>(ResamplingFunction::Last),
458        );
459        let lm = new_logical_meter_handle(lm_config).await;
460        let bat_volt_formula = lm.battery::<crate::metric::AcVoltage>(None).unwrap();
461
462        let samples = fetch_samples(bat_volt_formula, 10).await;
463        check_samples(
464            samples,
465            |q| q.as_volts(),
466            TimeDelta::try_milliseconds(200).unwrap(),
467            vec![
468                Some(400.0),
469                Some(400.0),
470                Some(398.0),
471                Some(396.0),
472                Some(396.0),
473                Some(396.0),
474                Some(396.0),
475                Some(396.0),
476                None,
477                None,
478            ],
479        );
480
481        let cons_pow_formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
482
483        let samples = fetch_samples(cons_pow_formula, 10).await;
484        check_samples(
485            samples,
486            |q| q.as_watts(),
487            TimeDelta::try_milliseconds(200).unwrap(),
488            vec![
489                Some(1.0),
490                Some(2.0),
491                Some(3.0),
492                Some(3.0),
493                Some(3.0),
494                Some(3.0),
495                Some(2.0),
496                Some(1.0),
497                Some(0.0),
498                Some(0.0),
499            ],
500        );
501    }
502
503    #[tokio::test(start_paused = true)]
504    async fn test_max_age_in_intervals() {
505        let lm_config = Some(
506            LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
507                .with_max_age_in_intervals(1)
508                .with_default_resampling_function(ResamplingFunction::Count),
509        );
510        let lm = new_logical_meter_handle(lm_config).await;
511        let formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
512
513        let samples = fetch_samples(formula, 8).await;
514        check_samples(
515            samples,
516            |q| q.as_watts(),
517            TimeDelta::try_milliseconds(200).unwrap(),
518            vec![
519                Some(1.0),
520                Some(1.0),
521                Some(1.0),
522                Some(1.0),
523                Some(1.0),
524                Some(1.0),
525                Some(0.0),
526                Some(0.0),
527            ],
528        );
529
530        let lm_config = Some(
531            LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
532                .with_max_age_in_intervals(3)
533                .with_default_resampling_function(ResamplingFunction::Count),
534        );
535        let lm = new_logical_meter_handle(lm_config).await;
536        let formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
537
538        let samples = fetch_samples(formula, 10).await;
539        check_samples(
540            samples,
541            |q| q.as_watts(),
542            TimeDelta::try_milliseconds(200).unwrap(),
543            vec![
544                Some(1.0),
545                Some(2.0),
546                Some(3.0),
547                Some(3.0),
548                Some(3.0),
549                Some(3.0),
550                Some(2.0),
551                Some(1.0),
552                Some(0.0),
553                Some(0.0),
554            ],
555        )
556    }
557
558    #[tokio::test(start_paused = true)]
559    async fn test_consumer_current_formula() {
560        let formula = new_logical_meter_handle(Some(
561            LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap())
562                .with_component_graph_config(
563                    ComponentGraphConfig::builder()
564                        .include_phantom_loads_in_consumer_formula(true)
565                        .build(),
566                ),
567        ))
568        .await
569        .consumer::<crate::metric::AcCurrent>()
570        .unwrap();
571
572        let samples = fetch_samples(formula, 10).await;
573        check_samples(
574            samples,
575            |q| q.as_amperes(),
576            TimeDelta::try_seconds(1).unwrap(),
577            vec![
578                Some(15.0),
579                Some(14.75),
580                Some(14.75),
581                Some(13.5),
582                Some(15.0),
583                Some(14.75),
584                Some(14.75),
585                Some(13.5),
586                Some(15.0),
587                Some(14.75),
588            ],
589        )
590    }
591
592    async fn fetch_samples<Q: Quantity>(formula: Formula<Q>, num_values: usize) -> Vec<Sample<Q>> {
593        let rx = formula.subscribe().await.unwrap();
594
595        BroadcastStream::new(rx)
596            .take(num_values)
597            .map(|x| x.unwrap())
598            .collect()
599            .await
600    }
601
602    #[track_caller]
603    fn check_samples<Q: Quantity>(
604        samples: Vec<Sample<Q>>,
605        extractor: impl Fn(Q) -> f32,
606        expected_interval: TimeDelta,
607        expected_values: Vec<Option<f32>>,
608    ) {
609        let values = samples
610            .iter()
611            .map(|res| res.value().map(&extractor))
612            .collect::<Vec<_>>();
613
614        samples.as_slice().windows(2).for_each(|w| {
615            assert_eq!(
616                w[1].timestamp() - w[0].timestamp(),
617                expected_interval,
618                "Samples are not spaced at the expected interval"
619            );
620        });
621
622        for (id, (v, ev)) in values.iter().zip(expected_values.iter()).enumerate() {
623            match (v, ev) {
624                (Some(v), Some(ev)) => assert!(
625                    (v - ev).abs() < 0.01,
626                    "Item {id} - expected value {ev:?}, got value {v:?}"
627                ),
628                (None, None) => {}
629                _ => panic!("Item {id} - expected value {ev:?}, got value {v:?}"),
630            }
631        }
632    }
633}