1use crate::logical_meter::formula::Formula;
5use crate::logical_meter::formula::graph_formula_provider::GraphFormulaProvider;
6use crate::{
7 client::MicrogridClientHandle,
8 error::Error,
9 metric,
10 proto::common::microgrid::electrical_components::{
11 ElectricalComponent, ElectricalComponentConnection,
12 },
13};
14use frequenz_microgrid_component_graph::{self, ComponentGraph};
15use std::collections::BTreeSet;
16use tokio::sync::mpsc;
17
18use super::{LogicalMeterConfig, logical_meter_actor::LogicalMeterActor};
19
20#[derive(Clone)]
22pub struct LogicalMeterHandle {
23 instructions_tx: mpsc::Sender<super::logical_meter_actor::Instruction>,
24 graph: ComponentGraph<ElectricalComponent, ElectricalComponentConnection>,
25}
26
27impl LogicalMeterHandle {
28 pub async fn try_new(
30 client: MicrogridClientHandle,
31 config: LogicalMeterConfig,
32 ) -> Result<Self, Error> {
33 let (sender, receiver) = mpsc::channel(8);
34 let graph = ComponentGraph::try_new(
35 client.list_electrical_components(vec![], vec![]).await?,
36 client
37 .list_electrical_component_connections(vec![], vec![])
38 .await?,
39 frequenz_microgrid_component_graph::ComponentGraphConfig {
40 allow_component_validation_failures: true,
41 allow_unconnected_components: true,
42 allow_unspecified_inverters: false,
43 disable_fallback_components: false,
44 },
45 )
46 .map_err(|e| {
47 Error::component_graph_error(format!("Unable to create a component graph: {e}"))
48 })?;
49
50 let logical_meter = LogicalMeterActor::try_new(receiver, client, config)?;
51
52 tokio::task::spawn(async move {
53 logical_meter.run().await;
54 });
55
56 Ok(Self {
57 instructions_tx: sender,
58 graph,
59 })
60 }
61
62 pub fn grid<M: metric::Metric>(
65 &mut self,
66 metric: M,
67 ) -> Result<Formula<M::QuantityType>, Error> {
68 Ok(Formula::Subscriber(Box::new(M::FormulaType::grid(
69 &self.graph,
70 metric,
71 self.instructions_tx.clone(),
72 )?)))
73 }
74
75 pub fn battery<M: metric::Metric>(
80 &mut self,
81 component_ids: Option<BTreeSet<u64>>,
82 metric: M,
83 ) -> Result<Formula<M::QuantityType>, Error> {
84 Ok(Formula::Subscriber(Box::new(M::FormulaType::battery(
85 &self.graph,
86 metric,
87 self.instructions_tx.clone(),
88 component_ids,
89 )?)))
90 }
91
92 pub fn chp<M: metric::Metric>(
97 &mut self,
98 component_ids: Option<BTreeSet<u64>>,
99 metric: M,
100 ) -> Result<Formula<M::QuantityType>, Error> {
101 Ok(Formula::Subscriber(Box::new(M::FormulaType::chp(
102 &self.graph,
103 metric,
104 self.instructions_tx.clone(),
105 component_ids,
106 )?)))
107 }
108
109 pub fn pv<M: metric::Metric>(
114 &mut self,
115 component_ids: Option<BTreeSet<u64>>,
116 metric: M,
117 ) -> Result<Formula<M::QuantityType>, Error> {
118 Ok(Formula::Subscriber(Box::new(M::FormulaType::pv(
119 &self.graph,
120 metric,
121 self.instructions_tx.clone(),
122 component_ids,
123 )?)))
124 }
125
126 pub fn ev_charger<M: metric::Metric>(
132 &mut self,
133 component_ids: Option<BTreeSet<u64>>,
134 metric: M,
135 ) -> Result<Formula<M::QuantityType>, Error> {
136 Ok(Formula::Subscriber(Box::new(M::FormulaType::ev_charger(
137 &self.graph,
138 metric,
139 self.instructions_tx.clone(),
140 component_ids,
141 )?)))
142 }
143
144 pub fn consumer<M: metric::Metric>(
147 &mut self,
148 metric: M,
149 ) -> Result<Formula<M::QuantityType>, Error> {
150 Ok(Formula::Subscriber(Box::new(M::FormulaType::consumer(
151 &self.graph,
152 metric,
153 self.instructions_tx.clone(),
154 )?)))
155 }
156
157 pub fn producer<M: metric::Metric>(
160 &mut self,
161 metric: M,
162 ) -> Result<Formula<M::QuantityType>, Error> {
163 Ok(Formula::Subscriber(Box::new(M::FormulaType::producer(
164 &self.graph,
165 metric,
166 self.instructions_tx.clone(),
167 )?)))
168 }
169
170 pub fn component<M: metric::Metric>(
173 &mut self,
174 component_id: u64,
175 metric: M,
176 ) -> Result<Formula<M::QuantityType>, Error> {
177 Ok(Formula::Subscriber(Box::new(M::FormulaType::component(
178 &self.graph,
179 metric,
180 self.instructions_tx.clone(),
181 component_id,
182 )?)))
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use chrono::TimeDelta;
189 use tokio_stream::{StreamExt, wrappers::BroadcastStream};
190
191 use crate::{
192 LogicalMeterConfig, LogicalMeterHandle, MicrogridClientHandle, Sample,
193 client::test_utils::{
194 MockComponent,
195 MockMicrogridApiClient, },
197 logical_meter::formula::Formula,
198 quantity::Quantity,
199 };
200
201 async fn new_logical_meter_handle() -> LogicalMeterHandle {
202 let api_client = MockMicrogridApiClient::new(
203 MockComponent::grid(1).with_children(vec![
205 MockComponent::meter(2)
207 .with_power(vec![4.0, 5.0, 6.0, 7.0, 7.0, 7.0])
208 .with_current(vec![1.0, 1.5, 2.0, 2.5, 2.0, 1.5])
209 .with_children(vec![
210 MockComponent::meter(3)
212 .with_reactive_power(vec![-2.0, -5.0, -4.0, 1.0, 3.0, 4.0])
213 .with_children(vec![
214 MockComponent::pv_inverter(4),
216 ]),
217 MockComponent::meter(5).with_children(vec![
219 MockComponent::battery_inverter(6)
221 .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
222 .with_children(vec![
223 MockComponent::battery(7),
225 ]),
226 MockComponent::battery_inverter(8)
228 .with_voltage(vec![400.0, 400.0, 398.0, 396.0, 396.0, 396.0])
229 .with_children(vec![
230 MockComponent::battery(9),
232 ]),
233 ]),
234 MockComponent::meter(10)
236 .with_current(vec![14.5, 15.0, 16.0, 15.5, 14.0, 13.5]),
237 MockComponent::meter(11).with_children(vec![
239 MockComponent::chp(12),
241 ]),
242 MockComponent::meter(13).with_children(vec![
244 MockComponent::ev_charger(14),
246 MockComponent::ev_charger(15),
247 ]),
248 ]),
249 ]),
250 );
251
252 LogicalMeterHandle::try_new(
253 MicrogridClientHandle::new_from_client(api_client),
254 LogicalMeterConfig {
255 resampling_interval: TimeDelta::try_seconds(1).unwrap(),
256 },
257 )
258 .await
259 .unwrap()
260 }
261
262 #[tokio::test]
263 async fn test_formula_display() {
264 let mut lm = new_logical_meter_handle().await;
265
266 let formula = lm.grid(crate::metric::AcPowerActive).unwrap();
267 assert_eq!(formula.to_string(), "METRIC_AC_POWER_ACTIVE::(#2)");
268
269 let formula = lm.battery(None, crate::metric::AcPowerReactive).unwrap();
270 assert_eq!(
271 formula.to_string(),
272 "METRIC_AC_POWER_REACTIVE::(COALESCE(#8 + #6, #5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)))"
273 );
274
275 let formula = lm
276 .battery(Some([9].into()), crate::metric::AcPowerActive)
277 .unwrap();
278 assert_eq!(
279 formula.to_string(),
280 "METRIC_AC_POWER_ACTIVE::(COALESCE(#8, 0.0))"
281 );
282
283 let formula = lm
284 .battery(Some([7].into()), crate::metric::AcVoltage)
285 .unwrap();
286 assert_eq!(formula.to_string(), "METRIC_AC_VOLTAGE::(COALESCE(#5, #6))");
287
288 let formula = lm.battery(None, crate::metric::AcFrequency).unwrap();
289 assert_eq!(
290 formula.to_string(),
291 "METRIC_AC_FREQUENCY::(COALESCE(#5, #6, #8))"
292 );
293
294 let formula = lm.pv(None, crate::metric::AcPowerReactive).unwrap();
295 assert_eq!(
296 formula.to_string(),
297 "METRIC_AC_POWER_REACTIVE::(COALESCE(#4, #3, 0.0))"
298 );
299
300 let formula = lm.chp(None, crate::metric::AcPowerActive).unwrap();
301 assert_eq!(
302 formula.to_string(),
303 "METRIC_AC_POWER_ACTIVE::(COALESCE(#12, #11, 0.0))"
304 );
305
306 let formula = lm.ev_charger(None, crate::metric::AcCurrent).unwrap();
307 assert_eq!(
308 formula.to_string(),
309 "METRIC_AC_CURRENT::(COALESCE(#15 + #14, #13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)))"
310 );
311
312 let formula = lm.consumer(crate::metric::AcCurrent).unwrap();
313 assert_eq!(
314 formula.to_string(),
315 concat!(
316 "METRIC_AC_CURRENT::(MAX(",
317 "#2 - COALESCE(#3, #4, 0.0) - COALESCE(#5, COALESCE(#8, 0.0) + COALESCE(#6, 0.0)) ",
318 "- #10 - COALESCE(#11, #12, 0.0)",
319 " - COALESCE(#13, COALESCE(#15, 0.0) + COALESCE(#14, 0.0)),",
320 " 0.0)",
321 " + COALESCE(MAX(#3 - #4, 0.0), 0.0) + COALESCE(MAX(#5 - #6 - #8, 0.0), 0.0)",
322 " + MAX(#10, 0.0) + COALESCE(MAX(#11 - #12, 0.0), 0.0)",
323 " + COALESCE(MAX(#13 - #14 - #15, 0.0), 0.0)",
324 ")"
325 )
326 );
327
328 let formula = lm.producer(crate::metric::AcPowerActive).unwrap();
329 assert_eq!(
330 formula.to_string(),
331 concat!(
332 "METRIC_AC_POWER_ACTIVE::(",
333 "MIN(COALESCE(#4, #3, 0.0), 0.0)",
334 " + MIN(COALESCE(#12, #11, 0.0), 0.0)",
335 ")"
336 )
337 );
338
339 let formula = lm.component(10, crate::metric::AcCurrent).unwrap();
340 assert_eq!(formula.to_string(), "METRIC_AC_CURRENT::(#10)");
341 }
342
343 #[tokio::test(start_paused = true)]
344 async fn test_grid_power_formula() {
345 let formula = new_logical_meter_handle()
346 .await
347 .grid(crate::metric::AcPowerActive)
348 .unwrap();
349
350 let samples = fetch_samples(formula, 10).await;
351
352 check_samples(
353 samples,
354 |q| q.as_watts(),
355 vec![
356 Some(5.8),
357 Some(6.0),
358 Some(6.0),
359 Some(7.0),
360 Some(5.8),
361 Some(6.0),
362 Some(6.0),
363 Some(7.0),
364 Some(5.8),
365 Some(6.0),
366 ],
367 )
368 }
369
370 #[tokio::test(start_paused = true)]
371 async fn test_pv_reactive_power_formula() {
372 let formula = new_logical_meter_handle()
373 .await
374 .pv(None, crate::metric::AcPowerReactive)
375 .unwrap();
376
377 let samples = fetch_samples(formula, 10).await;
378
379 check_samples(
380 samples,
381 |q| q.as_volt_amperes_reactive(),
382 vec![
383 Some(-1.4),
384 Some(-0.5),
385 Some(-0.5),
386 Some(4.0),
387 Some(-1.4),
388 Some(-0.5),
389 Some(-0.5),
390 Some(4.0),
391 Some(-1.4),
392 Some(-0.5),
393 ],
394 )
395 }
396
397 #[tokio::test(start_paused = true)]
398 async fn test_battery_voltage_formula() {
399 let formula = new_logical_meter_handle()
400 .await
401 .battery(None, crate::metric::AcVoltage)
402 .unwrap();
403
404 let samples = fetch_samples(formula, 10).await;
405 check_samples(
406 samples,
407 |q| q.as_volts(),
408 vec![
409 Some(398.0),
410 Some(397.67),
411 Some(397.67),
412 Some(396.0),
413 Some(398.0),
414 Some(397.67),
415 Some(397.67),
416 Some(396.0),
417 Some(398.0),
418 Some(397.67),
419 ],
420 )
421 }
422
423 #[tokio::test(start_paused = true)]
424 async fn test_consumer_current_formula() {
425 let formula = new_logical_meter_handle()
426 .await
427 .consumer(crate::metric::AcCurrent)
428 .unwrap();
429
430 let samples = fetch_samples(formula, 10).await;
431 check_samples(
432 samples,
433 |q| q.as_amperes(),
434 vec![
435 Some(15.0),
436 Some(14.75),
437 Some(14.75),
438 Some(13.5),
439 Some(15.0),
440 Some(14.75),
441 Some(14.75),
442 Some(13.5),
443 Some(15.0),
444 Some(14.75),
445 ],
446 )
447 }
448
449 async fn fetch_samples<Q: Quantity>(formula: Formula<Q>, num_values: usize) -> Vec<Sample<Q>> {
450 let rx = formula.subscribe().await.unwrap();
451
452 BroadcastStream::new(rx)
453 .take(num_values)
454 .map(|x| x.unwrap())
455 .collect()
456 .await
457 }
458
459 #[track_caller]
460 fn check_samples<Q: Quantity>(
461 samples: Vec<Sample<Q>>,
462 extractor: impl Fn(Q) -> f32,
463 expected_values: Vec<Option<f32>>,
464 ) {
465 let values = samples
466 .iter()
467 .map(|res| res.value().map(|v| extractor(v)))
468 .collect::<Vec<_>>();
469
470 let one_second = TimeDelta::try_seconds(1).unwrap();
471
472 samples.as_slice().windows(2).for_each(|w| {
473 assert_eq!(w[1].timestamp() - w[0].timestamp(), one_second);
474 });
475
476 for (v, ev) in values.iter().zip(expected_values.iter()) {
477 match (v, ev) {
478 (Some(v), Some(ev)) => assert!(
479 (v - ev).abs() < 0.01,
480 "expected value {ev:?}, got value {v:?}"
481 ),
482 (None, None) => {}
483 _ => panic!("expected value {ev:?}, got value {v:?}"),
484 }
485 }
486 }
487}