frequenz_microgrid/logical_meter/formula/async_formula/
composition.rs

1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4//! Methods for composing formulas.
5
6use tokio::sync::broadcast;
7
8use crate::{
9    Error,
10    Sample,
11    logical_meter::formula::{
12        Formula,
13        FormulaSubscriber,
14        async_formula::FormulaOperand, //
15    },
16    quantity::Quantity, //
17};
18
19impl<Q> Formula<Q>
20where
21    Q: Quantity + 'static,
22{
23    pub fn coalesce(self, other: impl Into<FormulaOperand<Q>>) -> Result<Formula<Q>, Error> {
24        match self {
25            Formula::Coalesce(mut items) => {
26                items.push(other.into());
27                Ok(Formula::Coalesce(items))
28            }
29            _ => Ok(Formula::Coalesce(vec![
30                FormulaOperand::Formula(Box::new(Formula::<Q, Q, f32>::Subscriber(Box::new(self)))),
31                other.into(),
32            ])),
33        }
34    }
35
36    pub fn min(self, other: impl Into<FormulaOperand<Q>>) -> Result<Formula<Q>, Error> {
37        match self {
38            Formula::Min(mut items) => {
39                items.push(other.into());
40                Ok(Formula::Min(items))
41            }
42            _ => Ok(Formula::Min(vec![
43                FormulaOperand::Formula(Box::new(Formula::<Q, Q, f32>::Subscriber(Box::new(self)))),
44                other.into(),
45            ])),
46        }
47    }
48
49    pub fn max(self, other: impl Into<FormulaOperand<Q>>) -> Result<Formula<Q>, Error> {
50        match self {
51            Formula::Max(mut items) => {
52                items.push(other.into());
53                Ok(Formula::Max(items))
54            }
55            _ => Ok(Formula::Max(vec![
56                FormulaOperand::Formula(Box::new(Formula::<Q, Q, f32>::Subscriber(Box::new(self)))),
57                other.into(),
58            ])),
59        }
60    }
61
62    pub fn avg(self, others: Vec<Formula<Q>>) -> Result<Formula<Q>, Error> {
63        let mut exprs: Vec<FormulaOperand<Q>> =
64            vec![FormulaOperand::Formula(Box::new(
65                Formula::<Q, Q, f32>::Subscriber(Box::new(self)),
66            ))];
67        for other in others {
68            exprs.push(other.into());
69        }
70        Ok(Formula::Avg(exprs))
71    }
72
73    pub async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<Q>>, Error> {
74        <Self as FormulaSubscriber>::subscribe(self).await
75    }
76}
77
78impl<Q, F> std::ops::Add<F> for Formula<Q>
79where
80    F: Into<FormulaOperand<Q>>,
81    Q: Quantity + 'static,
82{
83    type Output = Formula<Q>;
84
85    fn add(self, other: F) -> Self::Output {
86        Formula::Add(vec![FormulaOperand::Formula(Box::new(self)), other.into()])
87    }
88}
89
90impl<Q, F> std::ops::Sub<F> for Formula<Q>
91where
92    F: Into<FormulaOperand<Q>>,
93    Q: Quantity + 'static,
94{
95    type Output = Formula<Q>;
96
97    fn sub(self, other: F) -> Self::Output {
98        Formula::Subtract(vec![FormulaOperand::Formula(Box::new(self)), other.into()])
99    }
100}
101
102impl<Q> std::ops::Mul<f32> for Formula<Q, Q, f32>
103where
104    Q: Quantity + 'static,
105{
106    type Output = Formula<Q, Q, f32>;
107
108    fn mul(self, other: f32) -> Self::Output {
109        Formula::<Q, Q, f32>::Multiply(FormulaOperand::<Q>::Formula(Box::new(self)), other.into())
110    }
111}
112
113impl<Q> std::ops::Div<f32> for Formula<Q, Q, f32>
114where
115    Q: Quantity + 'static,
116{
117    type Output = Formula<Q, Q, f32>;
118
119    fn div(self, rhs: f32) -> Self::Output {
120        Formula::<Q, Q, f32>::Divide(FormulaOperand::<Q>::Formula(Box::new(self)), rhs.into())
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use super::*;
127    use crate::{client::test_utils::logging::capture_logs, quantity::Power};
128    use async_trait::async_trait;
129    use chrono::{DateTime, TimeDelta, Utc};
130
131    #[tokio::test]
132    async fn test_addition() {
133        let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)])
134            + formula(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)])
135            + formula(3, vec![Some(2.0), Some(3.0), Some(4.0), Some(5.0)]);
136        assert_eq!(composed.to_string(), "((#1 + #2) + #3)");
137        assert_eq!(
138            collect_values(&composed, Power::as_watts).await,
139            vec![Some(13.0), Some(17.0), None, Some(29.0)]
140        );
141
142        let composed = formula(4, vec![None, Some(100.0), Some(-10.0), Some(5.0)]) + composed;
143        assert_eq!(composed.to_string(), "(#4 + ((#1 + #2) + #3))");
144        assert_eq!(
145            collect_values(&composed, Power::as_watts).await,
146            vec![None, Some(117.0), None, Some(34.0)]
147        );
148
149        let composed = formula(5, vec![None, Some(4.0), Some(-10.0), Some(-10.0)]) + composed;
150        assert_eq!(composed.to_string(), "(#5 + (#4 + ((#1 + #2) + #3)))");
151        assert_eq!(
152            collect_values(&composed, Power::as_watts).await,
153            vec![None, Some(121.0), None, Some(24.0)]
154        );
155
156        let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)])
157            + Power::from_watts(5.0)
158            + formula(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]);
159        assert_eq!(composed.to_string(), "((#1 + 5 W) + #2)");
160        assert_eq!(
161            collect_values(&composed, Power::as_watts).await,
162            vec![Some(16.0), Some(19.0), None, Some(29.0)]
163        );
164    }
165
166    #[tokio::test]
167    async fn test_subtraction() {
168        let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)])
169            - formula(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)])
170            - formula(3, vec![Some(2.0), Some(3.0), Some(4.0), Some(5.0)]);
171        assert_eq!(composed.to_string(), "((#1 - #2) - #3)");
172        assert_eq!(
173            collect_values(&composed, Power::as_watts).await,
174            vec![Some(7.0), Some(7.0), None, Some(11.0)]
175        );
176
177        let composed = formula(4, vec![None, Some(100.0), Some(-10.0), Some(5.0)]) - composed;
178        assert_eq!(composed.to_string(), "(#4 - ((#1 - #2) - #3))");
179        assert_eq!(
180            collect_values(&composed, Power::as_watts).await,
181            vec![None, Some(93.0), None, Some(-6.0)]
182        );
183
184        let composed = formula(5, vec![None, Some(4.0), Some(-10.0), Some(-10.0)]) - composed;
185        assert_eq!(composed.to_string(), "(#5 - (#4 - ((#1 - #2) - #3)))");
186        assert_eq!(
187            collect_values(&composed, Power::as_watts).await,
188            vec![None, Some(-89.0), None, Some(-4.0)]
189        );
190
191        let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)])
192            - Power::from_watts(5.0)
193            - formula(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]);
194        assert_eq!(composed.to_string(), "((#1 - 5 W) - #2)");
195        assert_eq!(
196            collect_values(&composed, Power::as_watts).await,
197            vec![Some(4.0), Some(5.0), None, Some(11.0)]
198        );
199    }
200
201    #[tokio::test]
202    async fn test_multiplication() {
203        let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)]) * 2.0;
204        assert_eq!(composed.to_string(), "(#1 * 2)");
205        assert_eq!(
206            collect_values(&composed, Power::as_watts).await,
207            vec![Some(20.0), Some(24.0), None, Some(40.0)]
208        );
209    }
210
211    #[tokio::test]
212    async fn test_division() {
213        let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)]) / 2.0;
214        assert_eq!(composed.to_string(), "(#1 / 2)");
215        assert_eq!(
216            collect_values(&composed, Power::as_watts).await,
217            vec![Some(5.0), Some(6.0), None, Some(10.0)]
218        );
219
220        let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)]) / 0.0;
221        assert_eq!(composed.to_string(), "(#1 / 0)");
222        assert_eq!(
223            collect_values(&composed, Power::as_watts).await,
224            vec![None, None, None, None]
225        );
226    }
227
228    #[tokio::test]
229    async fn test_coalesce() {
230        let composed = formula(1, vec![None, Some(12.0), None, Some(20.0)])
231            .coalesce(formula(2, vec![Some(1.0), None, None, Some(4.0)]))
232            .unwrap()
233            .coalesce(formula(3, vec![Some(2.0), Some(3.0), Some(8.0), None]))
234            .unwrap();
235
236        assert_eq!(composed.to_string(), "COALESCE(#1, #2, #3)");
237        assert_eq!(
238            collect_values(&composed, Power::as_watts).await,
239            vec![Some(1.0), Some(12.0), Some(8.0), Some(20.0)]
240        );
241
242        let composed = formula(1, vec![None, Some(12.0), None, Some(20.0)])
243            .coalesce(Power::from_watts(5.0))
244            .unwrap();
245        assert_eq!(composed.to_string(), "COALESCE(#1, 5 W)");
246        assert_eq!(
247            collect_values(&composed, Power::as_watts).await,
248            vec![Some(5.0), Some(12.0), Some(5.0), Some(20.0)]
249        );
250    }
251
252    #[tokio::test]
253    async fn test_min() {
254        let composed = formula(1, vec![Some(10.0), Some(1.0), None, Some(20.0)])
255            .min(formula(
256                2,
257                vec![Some(5.0), Some(15.0), Some(3.0), Some(25.0)],
258            ))
259            .unwrap()
260            .min(formula(
261                3,
262                vec![Some(8.0), Some(11.0), Some(4.0), Some(18.0)],
263            ))
264            .unwrap();
265
266        assert_eq!(composed.to_string(), "MIN(#1, #2, #3)");
267        assert_eq!(
268            collect_values(&composed, Power::as_watts).await,
269            vec![Some(5.0), Some(1.0), None, Some(18.0)]
270        );
271
272        let composed = formula(1, vec![Some(10.0), Some(1.0), None, Some(20.0)])
273            .min(Power::from_watts(15.0))
274            .unwrap();
275        assert_eq!(composed.to_string(), "MIN(#1, 15 W)");
276        assert_eq!(
277            collect_values(&composed, Power::as_watts).await,
278            vec![Some(10.0), Some(1.0), None, Some(15.0)]
279        );
280    }
281
282    #[tokio::test]
283    async fn test_max() {
284        let composed = formula(1, vec![Some(10.0), Some(1.0), None, Some(20.0)])
285            .max(formula(
286                2,
287                vec![Some(5.0), Some(15.0), Some(3.0), Some(25.0)],
288            ))
289            .unwrap()
290            .max(formula(
291                3,
292                vec![Some(8.0), Some(21.0), Some(4.0), Some(18.0)],
293            ))
294            .unwrap();
295
296        assert_eq!(composed.to_string(), "MAX(#1, #2, #3)");
297        assert_eq!(
298            collect_values(&composed, Power::as_watts).await,
299            vec![Some(10.0), Some(21.0), None, Some(25.0)]
300        );
301
302        let composed = formula(1, vec![Some(10.0), Some(1.0), None, Some(20.0)])
303            .max(Power::from_watts(15.0))
304            .unwrap();
305        assert_eq!(composed.to_string(), "MAX(#1, 15 W)");
306        assert_eq!(
307            collect_values(&composed, Power::as_watts).await,
308            vec![Some(15.0), Some(15.0), None, Some(20.0)]
309        );
310    }
311
312    #[tokio::test]
313    async fn test_avg() {
314        let composed = formula(1, vec![Some(10.0), Some(20.0), None, Some(30.0), None])
315            .avg(vec![
316                formula(2, vec![Some(20.0), Some(30.0), Some(40.0), None, None]),
317                formula(3, vec![Some(30.0), Some(40.0), Some(50.0), None, None]),
318            ])
319            .unwrap();
320
321        assert_eq!(composed.to_string(), "AVG(#1, #2, #3)");
322        assert_eq!(
323            collect_values(&composed, Power::as_watts).await,
324            vec![Some(20.0), Some(30.0), Some(45.0), Some(30.0), None]
325        );
326    }
327
328    #[tokio::test]
329    async fn test_mixed_operations() {
330        let composed = (formula(1, vec![Some(10.0), Some(20.0), None, Some(30.0)])
331            + formula(2, vec![Some(5.0), Some(15.0), Some(25.0), Some(35.0)]))
332        .max(formula(
333            3,
334            vec![Some(12.0), Some(48.0), Some(22.0), Some(40.0)],
335        ))
336        .unwrap()
337            / 2.0;
338
339        assert_eq!(composed.to_string(), "(MAX((#1 + #2), #3) / 2)");
340        assert_eq!(
341            collect_values(&composed, Power::as_watts).await,
342            vec![Some(7.5), Some(24.0), None, Some(32.5)]
343        );
344
345        let composed = formula(1, vec![Some(100.0), Some(200.0), None, Some(300.0)])
346            .coalesce(formula(2, vec![Some(50.0), None, Some(25.0), Some(150.0)]))
347            .unwrap()
348            .min(formula(
349                3,
350                vec![Some(80.0), Some(180.0), Some(30.0), Some(250.0)],
351            ))
352            .unwrap();
353        assert_eq!(composed.to_string(), "MIN(COALESCE(#1, #2), #3)");
354        assert_eq!(
355            collect_values(&composed, Power::as_watts).await,
356            vec![Some(80.0), Some(180.0), Some(25.0), Some(250.0)]
357        );
358
359        let composed = formula(1, vec![Some(10.0), Some(20.0), None, Some(30.0)])
360            + formula(2, vec![Some(5.0), Some(15.0), Some(25.0), Some(35.0)])
361            - formula(3, vec![Some(3.0), Some(13.0), Some(23.0), Some(33.0)]) * 2.0;
362        assert_eq!(composed.to_string(), "((#1 + #2) - (#3 * 2))");
363        assert_eq!(
364            collect_values(&composed, Power::as_watts).await,
365            vec![Some(9.0), Some(9.0), None, Some(-1.0)]
366        );
367    }
368
369    #[tokio::test]
370    async fn test_out_of_sync_handling() {
371        let composed = formula(1, vec![Some(10.0), Some(20.0), Some(30.0), Some(40.0)])
372            + formula_out_of_sync(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]);
373
374        assert_eq!(composed.to_string(), "(#1 + #2)");
375        assert_eq!(
376            collect_values(&composed, Power::as_watts).await,
377            // skips first message of receiver with older data.
378            vec![Some(21.0), Some(32.0), Some(43.0)]
379        );
380    }
381
382    #[tokio::test]
383    async fn test_never_sync_handling() {
384        let composed = formula(1, vec![Some(10.0), Some(20.0), Some(30.0), Some(40.0)])
385            + formula_never_sync(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]);
386
387        assert_eq!(composed.to_string(), "(#1 + #2)");
388        let (collected_values, logs) =
389            capture_logs(|| collect_values(&composed, Power::as_watts)).await;
390        assert_eq!(
391            collected_values,
392            // sends nothing because the receivers could not be synchronized.
393            vec![]
394        );
395        assert_eq!(logs.len(), 1);
396        assert_eq!(
397            logs[0],
398            concat!(
399                "ERROR frequenz_microgrid::logical_meter::formula::async_formula: ",
400                "Couldn't synchronize receivers: ConnectionFailure: Could not receive sample: ",
401                "channel closed.  Stopping processing."
402            )
403        );
404    }
405
406    fn formula(comp_id: u64, values: Vec<Option<f32>>) -> Formula<Power> {
407        Formula::<Power>::Subscriber(Box::new(MockFormulaSubscriber::new(
408            DateTime::<Utc>::UNIX_EPOCH,
409            format!("#{comp_id}"),
410            values,
411        )))
412    }
413
414    fn formula_out_of_sync(comp_id: u64, values: Vec<Option<f32>>) -> Formula<Power, Power, f32> {
415        Formula::<Power, Power, f32>::Subscriber(Box::new(MockFormulaSubscriber::new(
416            DateTime::<Utc>::UNIX_EPOCH + TimeDelta::milliseconds(200),
417            format!("#{comp_id}"),
418            values,
419        )))
420    }
421
422    fn formula_never_sync(comp_id: u64, values: Vec<Option<f32>>) -> Formula<Power, Power, f32> {
423        Formula::<Power, Power, f32>::Subscriber(Box::new(MockFormulaSubscriber::new(
424            DateTime::<Utc>::UNIX_EPOCH + TimeDelta::milliseconds(100),
425            format!("#{comp_id}"),
426            values,
427        )))
428    }
429
430    async fn collect_values<T, F>(formula: &Formula<T>, extractor: F) -> Vec<Option<f32>>
431    where
432        T: Quantity,
433        F: Fn(&T) -> f32,
434    {
435        let mut rx = formula.subscribe().await.unwrap();
436        let mut values = Vec::new();
437        while let Ok(sample) = rx.recv().await {
438            values.push(sample.value().map(|v| extractor(&v)));
439        }
440        values
441    }
442
443    #[derive(Clone)]
444    struct MockFormulaSubscriber {
445        start_time: chrono::DateTime<Utc>,
446        formula: String,
447        values: Vec<Option<f32>>,
448    }
449
450    impl MockFormulaSubscriber {
451        fn new(
452            start_time: chrono::DateTime<Utc>,
453            formula: String,
454            values: Vec<Option<f32>>,
455        ) -> Self {
456            Self {
457                start_time,
458                formula,
459                values,
460            }
461        }
462    }
463
464    impl std::fmt::Display for MockFormulaSubscriber {
465        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
466            write!(f, "{}", self.formula)
467        }
468    }
469
470    #[async_trait]
471    impl FormulaSubscriber for MockFormulaSubscriber {
472        type QuantityType = Power;
473
474        async fn subscribe(
475            &self,
476        ) -> Result<broadcast::Receiver<Sample<Self::QuantityType>>, Error> {
477            let (tx, rx) = broadcast::channel(10);
478            for (idx, &value) in self.values.iter().enumerate() {
479                let sample = Sample::new(
480                    self.start_time + TimeDelta::new(0, (idx * 200_000_000) as u32).unwrap(),
481                    value.map(Power::from_watts),
482                );
483                let _ = tx.send(sample);
484            }
485            Ok(rx)
486        }
487    }
488}