frequenz_microgrid/logical_meter/formula/
async_formula.rs

1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4//! A nested formula that can contain other formulas.
5
6mod application_methods;
7mod composition;
8
9use async_trait::async_trait;
10use tokio::sync::broadcast::{self, error::RecvError};
11
12use crate::{Error, Sample, logical_meter::formula::FormulaSubscriber, quantity::Quantity};
13
14/// A composable Formula.
15pub enum Formula<QOut, QIn1 = QOut, QIn2 = f32>
16where
17    QOut: Quantity + 'static,
18    QIn1: Quantity + 'static,
19    QIn2: Quantity + 'static,
20    QIn1: std::ops::Mul<QIn2, Output = QOut>,
21{
22    Subscriber(Box<dyn FormulaSubscriber<QuantityType = QOut>>),
23    Coalesce(Vec<FormulaOperand<QOut>>),
24    Min(Vec<FormulaOperand<QOut>>),
25    Max(Vec<FormulaOperand<QOut>>),
26    Avg(Vec<FormulaOperand<QOut>>),
27    Add(Vec<FormulaOperand<QOut>>),
28    Subtract(Vec<FormulaOperand<QOut>>),
29    Multiply(FormulaOperand<QIn1>, FormulaOperand<QIn2>),
30    Divide(FormulaOperand<QIn1>, FormulaOperand<QIn2>),
31}
32
33pub enum FormulaOperand<Q: Quantity + 'static> {
34    Formula(Box<dyn FormulaSubscriber<QuantityType = Q>>),
35    Stream(broadcast::Receiver<crate::Sample<Q>>, String),
36    Quantity(Q),
37}
38
39impl<Q: Quantity + 'static> FormulaOperand<Q> {
40    async fn subscribe(&self) -> Result<FormulaOperand<Q>, Error> {
41        match self {
42            FormulaOperand::Formula(formula_subscriber) => Ok(FormulaOperand::Stream(
43                (*formula_subscriber).subscribe().await?,
44                formula_subscriber.to_string(),
45            )),
46            FormulaOperand::Stream(receiver, name) => {
47                Ok(FormulaOperand::Stream(receiver.resubscribe(), name.clone()))
48            }
49            FormulaOperand::Quantity(quantity) => Ok(FormulaOperand::Quantity(*quantity)),
50        }
51    }
52
53    async fn recv(&mut self) -> Result<FormulaValue<Q>, RecvError> {
54        match self {
55            FormulaOperand::Formula(..) => {
56                tracing::error!("Internal: FormulaItem::recv called on unsubscribed FormulaItem.");
57                Err(RecvError::Closed)
58            }
59            FormulaOperand::Stream(receiver, _) => match receiver.recv().await {
60                Ok(sample) => Ok(FormulaValue::Sample(sample)),
61                Err(e) => Err(e),
62            },
63            FormulaOperand::Quantity(q) => Ok(FormulaValue::Quantity(*q)),
64        }
65    }
66}
67
68impl<QOut, QIn1, QIn2> From<Formula<QOut, QIn1, QIn2>> for FormulaOperand<QOut>
69where
70    QOut: Quantity + 'static,
71    QIn1: Quantity + 'static,
72    QIn2: Quantity + 'static,
73    QOut: std::ops::Div<QIn2, Output = QIn1>,
74    QIn1: std::ops::Mul<QIn2, Output = QOut> + std::ops::Div<QIn2, Output = QOut>,
75{
76    fn from(formula: Formula<QOut, QIn1, QIn2>) -> Self {
77        FormulaOperand::Formula(Box::new(formula))
78    }
79}
80
81impl<Q> From<(broadcast::Receiver<crate::Sample<Q>>, String)> for FormulaOperand<Q>
82where
83    Q: Quantity + 'static,
84{
85    fn from(value: (broadcast::Receiver<crate::Sample<Q>>, String)) -> Self {
86        FormulaOperand::Stream(value.0, value.1)
87    }
88}
89
90impl<Q> From<Q> for FormulaOperand<Q>
91where
92    Q: Quantity + 'static,
93{
94    fn from(quantity: Q) -> Self {
95        FormulaOperand::Quantity(quantity)
96    }
97}
98
99impl<Q: Quantity + std::fmt::Display> std::fmt::Display for FormulaOperand<Q> {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            FormulaOperand::Formula(formula) => write!(f, "{formula}"),
103            FormulaOperand::Stream(_, name) => write!(f, "{name}"),
104            FormulaOperand::Quantity(q) => write!(f, "{q}"),
105        }
106    }
107}
108
109#[derive(Debug)]
110enum FormulaValue<Q: Quantity> {
111    Sample(crate::Sample<Q>),
112    Quantity(Q),
113}
114
115fn format_exprs<Q: Quantity + std::fmt::Display>(
116    f: &mut std::fmt::Formatter<'_>,
117    exprs: &[FormulaOperand<Q>],
118    prefix: &str,
119    sep: &str,
120) -> std::fmt::Result {
121    write!(f, "{prefix}(")?;
122    for (i, expr) in exprs.iter().enumerate() {
123        if i > 0 {
124            write!(f, "{sep}")?;
125        }
126        write!(f, "{expr}")?;
127    }
128    write!(f, ")")
129}
130
131impl<QOut, QIn1, QIn2> std::fmt::Display for Formula<QOut, QIn1, QIn2>
132where
133    QOut: Quantity,
134    QIn1: Quantity,
135    QIn2: Quantity,
136    QIn1: std::ops::Mul<QIn2, Output = QOut>,
137{
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        match self {
140            Formula::Subscriber(formula) => formula.fmt(f),
141            Formula::Coalesce(exprs) => format_exprs(f, exprs, "COALESCE", ", "),
142            Formula::Min(exprs) => format_exprs(f, exprs, "MIN", ", "),
143            Formula::Max(exprs) => format_exprs(f, exprs, "MAX", ", "),
144            Formula::Avg(exprs) => format_exprs(f, exprs, "AVG", ", "),
145            Formula::Add(exprs) => format_exprs(f, exprs, "", " + "),
146            Formula::Subtract(exprs) => format_exprs(f, exprs, "", " - "),
147            Formula::Multiply(lhs, rhs) => write!(f, "({lhs} * {rhs})"),
148            Formula::Divide(lhs, rhs) => write!(f, "({lhs} / {rhs})"),
149        }
150    }
151}
152
153async fn synchronize_receivers<Q: Quantity>(
154    formula_items: &mut [FormulaOperand<Q>],
155) -> Result<Vec<FormulaValue<Q>>, crate::Error> {
156    let mut latest = vec![];
157    for item in formula_items.iter_mut() {
158        match item.recv().await {
159            Ok(vv) => latest.push(vv),
160            Err(_) => todo!(),
161        };
162    }
163
164    let max_ts = latest
165        .iter()
166        .filter_map(|value| match value {
167            FormulaValue::Sample(sample) => Some(sample.timestamp()),
168            FormulaValue::Quantity(_) => None,
169        })
170        .max()
171        .ok_or_else(|| crate::Error::internal("No receivers to synchronize".to_string()))?;
172
173    // synchronize all receivers to the latest timestamp
174    for (ii, item) in formula_items.iter_mut().enumerate() {
175        let FormulaOperand::Stream(receiver, _) = item else {
176            continue;
177        };
178        let mut ctr = 0;
179        while let FormulaValue::Sample(sample) = latest[ii]
180            && sample.timestamp() != max_ts
181            && ctr < 10
182        {
183            ctr += 1;
184            match receiver.recv().await {
185                Ok(sample) => latest[ii] = FormulaValue::Sample(sample),
186                Err(e) => {
187                    return Err(crate::Error::connection_failure(format!(
188                        "Could not receive sample: {e}"
189                    )));
190                }
191            };
192        }
193
194        if let FormulaValue::Sample(sample) = latest[ii]
195            && sample.timestamp() != max_ts
196        {
197            return Err(crate::Error::internal(format!(
198                "Could not synchronize receiver {} to the latest timestamp: {}",
199                ii, max_ts
200            )));
201        }
202    }
203
204    Ok(latest)
205}
206
207async fn synchronize_two_receivers<Q1: Quantity, Q2: Quantity>(
208    formula_item1: &mut FormulaOperand<Q1>,
209    formula_item2: &mut FormulaOperand<Q2>,
210) -> Result<(FormulaValue<Q1>, FormulaValue<Q2>), crate::Error> {
211    match (formula_item1, formula_item2) {
212        (FormulaOperand::Stream(rx1, _), FormulaOperand::Stream(rx2, _)) => {
213            let mut latest1 = rx1.recv().await.map_err(|e| {
214                crate::Error::connection_failure(format!("Could not receive sample: {e}"))
215            })?;
216            let mut latest2 = rx2.recv().await.map_err(|e| {
217                crate::Error::connection_failure(format!("Could not receive sample: {e}"))
218            })?;
219
220            let max_ts = latest1.timestamp().max(latest2.timestamp());
221
222            let mut ctr = 0;
223            while latest1.timestamp() != max_ts && ctr < 10 {
224                ctr += 1;
225                latest1 = rx1.recv().await.map_err(|e| {
226                    crate::Error::connection_failure(format!("Could not receive sample: {e}"))
227                })?;
228            }
229            if latest1.timestamp() != max_ts {
230                return Err(crate::Error::internal(format!(
231                    "Could not synchronize receiver 1 to the latest timestamp: {}",
232                    max_ts
233                )));
234            }
235
236            ctr = 0;
237            while latest2.timestamp() != max_ts && ctr < 10 {
238                ctr += 1;
239                latest2 = rx2.recv().await.map_err(|e| {
240                    crate::Error::connection_failure(format!("Could not receive sample: {e}"))
241                })?;
242            }
243            if latest2.timestamp() != max_ts {
244                return Err(crate::Error::internal(format!(
245                    "Could not synchronize receiver 2 to the latest timestamp: {}",
246                    max_ts
247                )));
248            }
249            Ok((FormulaValue::Sample(latest1), FormulaValue::Sample(latest2)))
250        }
251        (FormulaOperand::Formula(..), _) | (_, FormulaOperand::Formula(..)) => {
252            Err(crate::Error::internal(
253                "Internal: synchronize_two_receivers called on unsubscribed FormulaItem."
254                    .to_string(),
255            ))
256        }
257        (item1, item2) => Ok((
258            match item1.recv().await {
259                Ok(v) => v,
260                Err(e) => {
261                    tracing::error!("Could not receive sample: {e}");
262                    return Err(crate::Error::connection_failure(format!(
263                        "Could not receive sample: {e}"
264                    )));
265                }
266            },
267            match item2.recv().await {
268                Ok(v) => v,
269                Err(e) => {
270                    tracing::error!("Could not receive sample: {e}");
271                    return Err(crate::Error::connection_failure(format!(
272                        "Could not receive sample: {e}"
273                    )));
274                }
275            },
276        )),
277    }
278}
279
280async fn run_formula<Q: Quantity>(
281    mut formula_items: Vec<FormulaOperand<Q>>,
282    result_sender: broadcast::Sender<crate::Sample<Q>>,
283    apply_fn: fn(&[FormulaValue<Q>]) -> Option<crate::Sample<Q>>,
284) {
285    match synchronize_receivers(&mut formula_items).await {
286        Ok(latest) => {
287            let value = match apply_fn(&latest) {
288                Some(sample) => sample,
289                None => {
290                    tracing::debug!(
291                        "No value computed. Stopping processing. Input values: {:?}",
292                        latest
293                    );
294                    return;
295                }
296            };
297            if let Err(err) = result_sender.send(value) {
298                tracing::debug!("No subscribers: {}. Stopping processing", err);
299                return;
300            }
301        }
302        Err(e) => {
303            tracing::error!(
304                "Couldn't synchronize receivers: {}.  Stopping processing.",
305                e
306            );
307            return;
308        }
309    };
310    loop {
311        let mut latest = vec![];
312        for formula_item in formula_items.iter_mut() {
313            latest.push(match formula_item.recv().await {
314                Ok(value) => value,
315                Err(RecvError::Closed) => {
316                    tracing::debug!("input receiver closed. stopping formula processing.");
317                    return;
318                }
319                Err(RecvError::Lagged(count)) => {
320                    tracing::warn!("input receiver lagged by {count} samples.");
321                    continue;
322                }
323            });
324        }
325        if latest.is_empty() {
326            tracing::debug!("No active input receivers.  Stopping processing.");
327            return;
328        }
329
330        let value = match apply_fn(&latest) {
331            Some(sample) => sample,
332            None => {
333                tracing::debug!(
334                    "No value computed. Stopping processing. Input values: {:?}",
335                    latest
336                );
337                return;
338            }
339        };
340        if let Err(err) = result_sender.send(value) {
341            tracing::debug!("No subscribers: {}. Stopping processing", err);
342            return;
343        }
344    }
345}
346
347async fn run_two_item_formula<QOut, QIn1, QIn2>(
348    mut formula_item1: FormulaOperand<QIn1>,
349    mut formula_item2: FormulaOperand<QIn2>,
350    result_sender: broadcast::Sender<crate::Sample<QOut>>,
351    apply_fn: fn(&FormulaValue<QIn1>, &FormulaValue<QIn2>) -> Option<crate::Sample<QOut>>,
352) where
353    QOut: Quantity,
354    QIn1: Quantity,
355    QIn2: Quantity,
356{
357    match synchronize_two_receivers(&mut formula_item1, &mut formula_item2).await {
358        Ok((value1, value2)) => {
359            let value = match apply_fn(&value1, &value2) {
360                Some(sample) => sample,
361                None => {
362                    tracing::debug!(
363                        "No value computed. Stopping processing. Input values: {:?}, {:?}",
364                        value1,
365                        value2
366                    );
367                    return;
368                }
369            };
370            if let Err(err) = result_sender.send(value) {
371                tracing::debug!("No subscribers: {}. Stopping processing", err);
372                return;
373            }
374        }
375        Err(e) => {
376            tracing::error!(
377                "Couldn't synchronize receivers: {}.  Stopping processing.",
378                e
379            );
380            return;
381        }
382    }
383
384    loop {
385        let (value1, value2) = match (formula_item1.recv().await, formula_item2.recv().await) {
386            (Ok(v1), Ok(v2)) => (v1, v2),
387            (Err(RecvError::Closed), _) | (_, Err(RecvError::Closed)) => {
388                tracing::debug!("input receiver closed. stopping formula processing.");
389                return;
390            }
391            (Err(RecvError::Lagged(count)), _) | (_, Err(RecvError::Lagged(count))) => {
392                tracing::warn!("input receiver lagged by {count} samples.");
393                continue;
394            }
395        };
396
397        let result = match apply_fn(&value1, &value2) {
398            Some(sample) => sample,
399            None => {
400                tracing::error!(
401                    "No value computed. Stopping processing. Input values: {:?}, {:?}",
402                    value1,
403                    value2
404                );
405                return;
406            }
407        };
408
409        if let Err(err) = result_sender.send(result) {
410            tracing::debug!("No subscribers: {}. Stopping processing", err);
411            return;
412        }
413    }
414}
415
416#[async_trait]
417impl<QOut, QIn1, QIn2> FormulaSubscriber for Formula<QOut, QIn1, QIn2>
418where
419    QOut: Quantity + 'static,
420    QIn1: Quantity + 'static + std::ops::Mul<QIn2, Output = QOut>,
421    QIn2: Quantity + 'static,
422    // The below works only as long as QIn2 is unit-less.  If we want to
423    // support division for other quantities, we need to introduce additional
424    // generic types for division.
425    QIn1: std::ops::Div<QIn2, Output = QOut>,
426{
427    type QuantityType = QOut;
428
429    async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<QOut>>, Error> {
430        match &self {
431            Formula::Subscriber(formula) => (*formula).subscribe().await,
432            Formula::Coalesce(exprs)
433            | Formula::Min(exprs)
434            | Formula::Max(exprs)
435            | Formula::Avg(exprs)
436            | Formula::Add(exprs)
437            | Formula::Subtract(exprs) => {
438                let mut formula_items = Vec::new();
439                for expr in exprs {
440                    formula_items.push(expr.subscribe().await?);
441                }
442
443                let (tx, rx) = broadcast::channel(100);
444                tokio::spawn(run_formula(
445                    formula_items,
446                    tx,
447                    match self {
448                        Formula::Coalesce(_) => application_methods::coalesce_samples,
449                        Formula::Min(_) => application_methods::min_samples,
450                        Formula::Max(_) => application_methods::max_samples,
451                        Formula::Avg(_) => application_methods::avg_samples,
452                        Formula::Add(_) => application_methods::add_samples,
453                        Formula::Subtract(_) => application_methods::subtract_samples,
454                        _ => {
455                            // This case is unreachable due to the match above.
456                            return Err(Error::internal(
457                                "unexpected formula type in subscribe.".to_string(),
458                            ));
459                        }
460                    },
461                ));
462                Ok(rx)
463            }
464            Formula::Multiply(lhs, rhs) | Formula::Divide(lhs, rhs) => {
465                let lhs = lhs.subscribe().await?;
466                let rhs = rhs.subscribe().await?;
467
468                let (tx, rx) = broadcast::channel(100);
469                tokio::spawn(run_two_item_formula(
470                    lhs,
471                    rhs,
472                    tx,
473                    match self {
474                        Formula::Multiply(_, _) => application_methods::multiply_samples,
475                        Formula::Divide(_, _) => application_methods::divide_samples,
476                        _ => {
477                            // This case is unreachable due to the match above.
478                            return Err(Error::internal(
479                                "unexpected formula type in subscribe.".to_string(),
480                            ));
481                        }
482                    },
483                ));
484
485                Ok(rx)
486            }
487        }
488    }
489}