1use crate::logical_meter::formula::Formula;
5use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider;
6use crate::{
7 client::MicrogridClientHandle,
8 client::proto::common::microgrid::electrical_components::{
9 ElectricalComponent, ElectricalComponentConnection,
10 },
11 error::Error,
12 metric,
13};
14use frequenz_microgrid_component_graph::{self, ComponentGraph, ComponentGraphConfig};
15use std::collections::BTreeSet;
16use std::time::Duration;
17use tokio::sync::mpsc;
18
19use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor};
20
21#[derive(Clone)]
23pub struct LogicalMeterHandle {
24 instructions_tx: mpsc::Sender<super::logical_meter_actor::Instruction>,
25 graph: ComponentGraph<ElectricalComponent, ElectricalComponentConnection>,
26}
27
28impl LogicalMeterHandle {
29 pub async fn try_new(
36 client: MicrogridClientHandle,
37 config: LogicalMeterConfig,
38 ) -> Result<Self, Error> {
39 Self::try_new_with_clock(client, config, crate::wall_clock_timer::SystemClock).await
40 }
41
42 pub(crate) async fn try_new_with_clock<C: crate::wall_clock_timer::Clock + 'static>(
43 client: MicrogridClientHandle,
44 config: LogicalMeterConfig,
45 clock: C,
46 ) -> Result<Self, Error> {
47 let (sender, receiver) = mpsc::channel(8);
48 const RETRY_DELAY: Duration = Duration::from_secs(3);
49 let graph = loop {
50 match build_component_graph(&client, &config.component_graph_config).await {
51 Ok(g) => break g,
52 Err(reason) => {
53 tracing::warn!(
54 "Microgrid logical-meter setup failed, retrying in {:?}: {reason}",
55 RETRY_DELAY
56 );
57 tokio::time::sleep(RETRY_DELAY).await;
58 }
59 }
60 };
61
62 let logical_meter = LogicalMeterActor::try_new(receiver, client, config, clock)?;
63
64 tokio::task::spawn(async move {
65 logical_meter.run().await;
66 });
67
68 Ok(Self {
69 instructions_tx: sender,
70 graph,
71 })
72 }
73
74 pub fn grid<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
77 Ok(Formula::Subscriber(Box::new(M::FormulaType::grid(
78 &self.graph,
79 self.instructions_tx.clone(),
80 )?)))
81 }
82
83 pub fn battery<M: metric::Metric>(
88 &self,
89 component_ids: Option<BTreeSet<u64>>,
90 ) -> Result<Formula<M::QuantityType>, Error> {
91 Ok(Formula::Subscriber(Box::new(M::FormulaType::battery(
92 &self.graph,
93 self.instructions_tx.clone(),
94 component_ids,
95 )?)))
96 }
97
98 pub fn chp<M: metric::Metric>(
103 &self,
104 component_ids: Option<BTreeSet<u64>>,
105 ) -> Result<Formula<M::QuantityType>, Error> {
106 Ok(Formula::Subscriber(Box::new(M::FormulaType::chp(
107 &self.graph,
108 self.instructions_tx.clone(),
109 component_ids,
110 )?)))
111 }
112
113 pub fn pv<M: metric::Metric>(
118 &self,
119 component_ids: Option<BTreeSet<u64>>,
120 ) -> Result<Formula<M::QuantityType>, Error> {
121 Ok(Formula::Subscriber(Box::new(M::FormulaType::pv(
122 &self.graph,
123 self.instructions_tx.clone(),
124 component_ids,
125 )?)))
126 }
127
128 pub fn ev_charger<M: metric::Metric>(
134 &self,
135 component_ids: Option<BTreeSet<u64>>,
136 ) -> Result<Formula<M::QuantityType>, Error> {
137 Ok(Formula::Subscriber(Box::new(M::FormulaType::ev_charger(
138 &self.graph,
139 self.instructions_tx.clone(),
140 component_ids,
141 )?)))
142 }
143
144 pub fn consumer<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
147 Ok(Formula::Subscriber(Box::new(M::FormulaType::consumer(
148 &self.graph,
149 self.instructions_tx.clone(),
150 )?)))
151 }
152
153 pub fn producer<M: metric::Metric>(&self) -> Result<Formula<M::QuantityType>, Error> {
156 Ok(Formula::Subscriber(Box::new(M::FormulaType::producer(
157 &self.graph,
158 self.instructions_tx.clone(),
159 )?)))
160 }
161
162 pub fn component<M: metric::Metric>(
165 &self,
166 component_id: u64,
167 ) -> Result<Formula<M::QuantityType>, Error> {
168 Ok(Formula::Subscriber(Box::new(M::FormulaType::component(
169 &self.graph,
170 self.instructions_tx.clone(),
171 component_id,
172 )?)))
173 }
174
175 pub fn graph(&self) -> &ComponentGraph<ElectricalComponent, ElectricalComponentConnection> {
177 &self.graph
178 }
179}
180
181async fn build_component_graph(
185 client: &MicrogridClientHandle,
186 config: &ComponentGraphConfig,
187) -> Result<ComponentGraph<ElectricalComponent, ElectricalComponentConnection>, String> {
188 let components = client
189 .list_electrical_components(vec![], vec![])
190 .await
191 .map_err(|e| format!("fetching components failed: {e}"))?;
192 let connections = client
193 .list_electrical_component_connections(vec![], vec![])
194 .await
195 .map_err(|e| format!("fetching component connections failed: {e}"))?;
196 ComponentGraph::try_new(components, connections, config.clone())
197 .map_err(|e| format!("building component graph failed: {e}"))
198}
199
200#[cfg(test)]
201mod tests {
202 use chrono::TimeDelta;
203 use frequenz_resampling::ResamplingFunction;
204 use tokio_stream::{StreamExt, wrappers::BroadcastStream};
205
206 use frequenz_microgrid_component_graph::ComponentGraphConfig;
207
208 use crate::{
209 LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, Sample,
210 client::test_utils::{
211 MockComponent,
212 MockMicrogridApiClient, },
214 logical_meter::formula::Formula,
215 quantity::Quantity,
216 };
217
218 async fn new_logical_meter_handle(config: Option<LogicalMeterConfig>) -> LogicalMeterHandle {
219 let api_client = MockMicrogridApiClient::new(
220 MockComponent::grid(1).with_children(vec![
222 MockComponent::meter(2)
224 .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
225 .with_current(vec![1.0, 1.5, 2.0, 2.5, 2.0, 1.5])
226 .with_children(vec![
227 MockComponent::meter(3)
229 .with_reactive_power(vec![-2.0, -5.0, -4.0, 1.0, 3.0, 4.0])
230 .with_children(vec![
231 MockComponent::pv_inverter(4),
233 ]),
234 MockComponent::meter(5).with_children(vec![
236 MockComponent::battery_inverter(6)
238 .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
239 .with_children(vec![
240 MockComponent::battery(7),
242 ]),
243 MockComponent::battery_inverter(8)
245 .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
246 .with_children(vec![
247 MockComponent::battery(9),
249 ]),
250 ]),
251 MockComponent::meter(10)
253 .with_current(vec![14.5, 15.0, 16.0, 15.5, 14.0, 13.5]),
254 MockComponent::meter(11).with_children(vec![
256 MockComponent::chp(12),
258 ]),
259 MockComponent::meter(13).with_children(vec![
261 MockComponent::ev_charger(14),
263 MockComponent::ev_charger(15),
264 ]),
265 ]),
266 ]),
267 );
268
269 let clock = api_client.clock();
270 LogicalMeterHandle::try_new_with_clock(
271 MicrogridClientHandle::new_from_client(api_client),
272 config.unwrap_or_else(|| LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap())),
273 clock,
274 )
275 .await
276 .unwrap()
277 }
278
279 #[tokio::test]
280 async fn test_formula_display() {
281 let lm = new_logical_meter_handle(Some(
282 LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap())
283 .with_component_graph_config(
284 ComponentGraphConfig::builder()
285 .prefer_meters_in_component_formulas(false)
286 .include_phantom_loads_in_consumer_formula(true)
287 .build(),
288 ),
289 ))
290 .await;
291
292 let formula = lm.grid::<crate::metric::AcPowerActive>().unwrap();
293 assert_eq!(formula.to_string(), "METRIC_AC_POWER_ACTIVE::(#2)");
294
295 let formula = lm.battery::<crate::metric::AcPowerReactive>(None).unwrap();
296 assert_eq!(
297 formula.to_string(),
298 "METRIC_AC_POWER_REACTIVE::(COALESCE(#8 + #6, #5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)))"
299 );
300
301 let formula = lm
302 .battery::<crate::metric::AcPowerActive>(Some([9].into()))
303 .unwrap();
304 assert_eq!(
305 formula.to_string(),
306 "METRIC_AC_POWER_ACTIVE::(COALESCE(#8, 0.0))"
307 );
308
309 let formula = lm
310 .battery::<crate::metric::AcVoltage>(Some([7].into()))
311 .unwrap();
312 assert_eq!(formula.to_string(), "METRIC_AC_VOLTAGE::(COALESCE(#5, #6))");
313
314 let formula = lm.battery::<crate::metric::AcFrequency>(None).unwrap();
315 assert_eq!(
316 formula.to_string(),
317 "METRIC_AC_FREQUENCY::(COALESCE(#5, #6, #8))"
318 );
319
320 let formula = lm.pv::<crate::metric::AcPowerReactive>(None).unwrap();
321 assert_eq!(
322 formula.to_string(),
323 "METRIC_AC_POWER_REACTIVE::(COALESCE(#4, #3, 0.0))"
324 );
325
326 let formula = lm.chp::<crate::metric::AcPowerActive>(None).unwrap();
327 assert_eq!(
328 formula.to_string(),
329 "METRIC_AC_POWER_ACTIVE::(COALESCE(#12, #11, 0.0))"
330 );
331
332 let formula = lm.ev_charger::<crate::metric::AcCurrent>(None).unwrap();
333 assert_eq!(
334 formula.to_string(),
335 "METRIC_AC_CURRENT::(COALESCE(#15 + #14, #13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)))"
336 );
337
338 let formula = lm.consumer::<crate::metric::AcCurrent>().unwrap();
339 assert_eq!(
340 formula.to_string(),
341 concat!(
342 "METRIC_AC_CURRENT::(MAX(",
343 "#2 - COALESCE(#3, #4, 0.0) - COALESCE(#5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)) ",
344 "- #10 - COALESCE(#11, #12, 0.0)",
345 " - COALESCE(#13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)),",
346 " 0.0)",
347 " + COALESCE(MAX(#3 - #4, 0.0), 0.0) + COALESCE(MAX(#5 - #6 - #8, 0.0), 0.0)",
348 " + MAX(#10, 0.0) + COALESCE(MAX(#11 - #12, 0.0), 0.0)",
349 " + COALESCE(MAX(#13 - #14 - #15, 0.0), 0.0)",
350 ")"
351 )
352 );
353
354 let formula = lm.producer::<crate::metric::AcPowerActive>().unwrap();
355 assert_eq!(
356 formula.to_string(),
357 concat!(
358 "METRIC_AC_POWER_ACTIVE::(",
359 "MIN(COALESCE(#4, #3, 0.0), 0.0)",
360 " + MIN(COALESCE(#12, #11, 0.0), 0.0)",
361 ")"
362 )
363 );
364
365 let formula = lm.component::<crate::metric::AcCurrent>(10).unwrap();
366 assert_eq!(formula.to_string(), "METRIC_AC_CURRENT::(#10)");
367 }
368
369 #[tokio::test(start_paused = true)]
370 async fn test_grid_power_formula() {
371 let formula = new_logical_meter_handle(None)
372 .await
373 .grid::<crate::metric::AcPowerActive>()
374 .unwrap();
375
376 let samples = fetch_samples(formula, 10).await;
377
378 check_samples(
379 samples,
380 |q| q.as_watts(),
381 TimeDelta::try_seconds(1).unwrap(),
382 vec![
383 Some(5.8),
384 Some(6.0),
385 Some(6.0),
386 Some(7.0),
387 Some(5.8),
388 Some(6.0),
389 Some(6.0),
390 Some(7.0),
391 Some(5.8),
392 Some(6.0),
393 ],
394 )
395 }
396
397 #[tokio::test(start_paused = true)]
398 async fn test_pv_reactive_power_formula() {
399 let formula = new_logical_meter_handle(None)
400 .await
401 .pv::<crate::metric::AcPowerReactive>(None)
402 .unwrap();
403
404 let samples = fetch_samples(formula, 10).await;
405
406 check_samples(
407 samples,
408 |q| q.as_volt_amperes_reactive(),
409 TimeDelta::try_seconds(1).unwrap(),
410 vec![
411 Some(-1.4),
412 Some(-0.5),
413 Some(-0.5),
414 Some(4.0),
415 Some(-1.4),
416 Some(-0.5),
417 Some(-0.5),
418 Some(4.0),
419 Some(-1.4),
420 Some(-0.5),
421 ],
422 )
423 }
424
425 #[tokio::test(start_paused = true)]
426 async fn test_battery_voltage_formula() {
427 let formula = new_logical_meter_handle(None)
428 .await
429 .battery::<crate::metric::AcVoltage>(None)
430 .unwrap();
431
432 let samples = fetch_samples(formula, 10).await;
433 check_samples(
434 samples,
435 |q| q.as_volts(),
436 TimeDelta::try_seconds(1).unwrap(),
437 vec![
438 Some(398.0),
439 Some(397.67),
440 Some(397.67),
441 Some(396.0),
442 Some(398.0),
443 Some(397.67),
444 Some(397.67),
445 Some(396.0),
446 Some(398.0),
447 Some(397.67),
448 ],
449 )
450 }
451
452 #[tokio::test(start_paused = true)]
453 async fn test_resampling_functions() {
454 let lm_config = Some(
455 LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
456 .with_default_resampling_function(ResamplingFunction::Count)
457 .override_resampling_function::<crate::metric::AcVoltage>(ResamplingFunction::Last),
458 );
459 let lm = new_logical_meter_handle(lm_config).await;
460 let bat_volt_formula = lm.battery::<crate::metric::AcVoltage>(None).unwrap();
461
462 let samples = fetch_samples(bat_volt_formula, 10).await;
463 check_samples(
464 samples,
465 |q| q.as_volts(),
466 TimeDelta::try_milliseconds(200).unwrap(),
467 vec![
468 Some(400.0),
469 Some(400.0),
470 Some(398.0),
471 Some(396.0),
472 Some(396.0),
473 Some(396.0),
474 Some(396.0),
475 Some(396.0),
476 None,
477 None,
478 ],
479 );
480
481 let cons_pow_formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
482
483 let samples = fetch_samples(cons_pow_formula, 10).await;
484 check_samples(
485 samples,
486 |q| q.as_watts(),
487 TimeDelta::try_milliseconds(200).unwrap(),
488 vec![
489 Some(1.0),
490 Some(2.0),
491 Some(3.0),
492 Some(3.0),
493 Some(3.0),
494 Some(3.0),
495 Some(2.0),
496 Some(1.0),
497 Some(0.0),
498 Some(0.0),
499 ],
500 );
501 }
502
503 #[tokio::test(start_paused = true)]
504 async fn test_max_age_in_intervals() {
505 let lm_config = Some(
506 LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
507 .with_max_age_in_intervals(1)
508 .with_default_resampling_function(ResamplingFunction::Count),
509 );
510 let lm = new_logical_meter_handle(lm_config).await;
511 let formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
512
513 let samples = fetch_samples(formula, 8).await;
514 check_samples(
515 samples,
516 |q| q.as_watts(),
517 TimeDelta::try_milliseconds(200).unwrap(),
518 vec![
519 Some(1.0),
520 Some(1.0),
521 Some(1.0),
522 Some(1.0),
523 Some(1.0),
524 Some(1.0),
525 Some(0.0),
526 Some(0.0),
527 ],
528 );
529
530 let lm_config = Some(
531 LogicalMeterConfig::new(TimeDelta::try_milliseconds(200).unwrap())
532 .with_max_age_in_intervals(3)
533 .with_default_resampling_function(ResamplingFunction::Count),
534 );
535 let lm = new_logical_meter_handle(lm_config).await;
536 let formula = lm.consumer::<crate::metric::AcPowerActive>().unwrap();
537
538 let samples = fetch_samples(formula, 10).await;
539 check_samples(
540 samples,
541 |q| q.as_watts(),
542 TimeDelta::try_milliseconds(200).unwrap(),
543 vec![
544 Some(1.0),
545 Some(2.0),
546 Some(3.0),
547 Some(3.0),
548 Some(3.0),
549 Some(3.0),
550 Some(2.0),
551 Some(1.0),
552 Some(0.0),
553 Some(0.0),
554 ],
555 )
556 }
557
558 #[tokio::test(start_paused = true)]
559 async fn test_consumer_current_formula() {
560 let formula = new_logical_meter_handle(Some(
561 LogicalMeterConfig::new(TimeDelta::try_seconds(1).unwrap())
562 .with_component_graph_config(
563 ComponentGraphConfig::builder()
564 .include_phantom_loads_in_consumer_formula(true)
565 .build(),
566 ),
567 ))
568 .await
569 .consumer::<crate::metric::AcCurrent>()
570 .unwrap();
571
572 let samples = fetch_samples(formula, 10).await;
573 check_samples(
574 samples,
575 |q| q.as_amperes(),
576 TimeDelta::try_seconds(1).unwrap(),
577 vec![
578 Some(15.0),
579 Some(14.75),
580 Some(14.75),
581 Some(13.5),
582 Some(15.0),
583 Some(14.75),
584 Some(14.75),
585 Some(13.5),
586 Some(15.0),
587 Some(14.75),
588 ],
589 )
590 }
591
592 async fn fetch_samples<Q: Quantity>(formula: Formula<Q>, num_values: usize) -> Vec<Sample<Q>> {
593 let rx = formula.subscribe().await.unwrap();
594
595 BroadcastStream::new(rx)
596 .take(num_values)
597 .map(|x| x.unwrap())
598 .collect()
599 .await
600 }
601
602 #[track_caller]
603 fn check_samples<Q: Quantity>(
604 samples: Vec<Sample<Q>>,
605 extractor: impl Fn(Q) -> f32,
606 expected_interval: TimeDelta,
607 expected_values: Vec<Option<f32>>,
608 ) {
609 let values = samples
610 .iter()
611 .map(|res| res.value().map(&extractor))
612 .collect::<Vec<_>>();
613
614 samples.as_slice().windows(2).for_each(|w| {
615 assert_eq!(
616 w[1].timestamp() - w[0].timestamp(),
617 expected_interval,
618 "Samples are not spaced at the expected interval"
619 );
620 });
621
622 for (id, (v, ev)) in values.iter().zip(expected_values.iter()).enumerate() {
623 match (v, ev) {
624 (Some(v), Some(ev)) => assert!(
625 (v - ev).abs() < 0.01,
626 "Item {id} - expected value {ev:?}, got value {v:?}"
627 ),
628 (None, None) => {}
629 _ => panic!("Item {id} - expected value {ev:?}, got value {v:?}"),
630 }
631 }
632 }
633}