Skip to main content

frequenz_microgrid/logical_meter/formula/
async_formula.rs

1// License: MIT
2// Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3
4//! A nested formula that can contain other formulas.
5
6mod application_methods;
7mod composition;
8
9use async_trait::async_trait;
10use tokio::sync::broadcast::{self, error::RecvError};
11
12use crate::{Error, Sample, logical_meter::formula::FormulaSubscriber, quantity::Quantity};
13
14/// A composable Formula.
15pub enum Formula<QOut, QIn1 = QOut, QIn2 = f32>
16where
17    QOut: Quantity + 'static,
18    QIn1: Quantity + 'static,
19    QIn2: Quantity + 'static,
20    QIn1: std::ops::Mul<QIn2, Output = QOut>,
21{
22    Subscriber(Box<dyn FormulaSubscriber<QuantityType = QOut>>),
23    Coalesce(Vec<FormulaOperand<QOut>>),
24    Min(Vec<FormulaOperand<QOut>>),
25    Max(Vec<FormulaOperand<QOut>>),
26    Avg(Vec<FormulaOperand<QOut>>),
27    Add(Vec<FormulaOperand<QOut>>),
28    Subtract(Vec<FormulaOperand<QOut>>),
29    Multiply(FormulaOperand<QIn1>, FormulaOperand<QIn2>),
30    Divide(FormulaOperand<QIn1>, FormulaOperand<QIn2>),
31}
32
33pub enum FormulaOperand<Q: Quantity + 'static> {
34    Formula(Box<dyn FormulaSubscriber<QuantityType = Q>>),
35    Stream(broadcast::Receiver<crate::Sample<Q>>, String),
36    Quantity(Q),
37}
38
39impl<Q: Quantity + 'static> FormulaOperand<Q> {
40    async fn subscribe(&self) -> Result<FormulaOperand<Q>, Error> {
41        match self {
42            FormulaOperand::Formula(formula_subscriber) => Ok(FormulaOperand::Stream(
43                (*formula_subscriber).subscribe().await?,
44                formula_subscriber.to_string(),
45            )),
46            FormulaOperand::Stream(receiver, name) => {
47                Ok(FormulaOperand::Stream(receiver.resubscribe(), name.clone()))
48            }
49            FormulaOperand::Quantity(quantity) => Ok(FormulaOperand::Quantity(*quantity)),
50        }
51    }
52
53    async fn recv(&mut self) -> Result<FormulaValue<Q>, RecvError> {
54        match self {
55            FormulaOperand::Formula(..) => {
56                tracing::error!("Internal: FormulaItem::recv called on unsubscribed FormulaItem.");
57                Err(RecvError::Closed)
58            }
59            FormulaOperand::Stream(receiver, _) => match receiver.recv().await {
60                Ok(sample) => Ok(FormulaValue::Sample(sample)),
61                Err(e) => Err(e),
62            },
63            FormulaOperand::Quantity(q) => Ok(FormulaValue::Quantity(*q)),
64        }
65    }
66}
67
68impl<QOut, QIn1, QIn2> From<Formula<QOut, QIn1, QIn2>> for FormulaOperand<QOut>
69where
70    QOut: Quantity + 'static,
71    QIn1: Quantity + 'static,
72    QIn2: Quantity + 'static,
73    QOut: std::ops::Div<QIn2, Output = QIn1>,
74    QIn1: std::ops::Mul<QIn2, Output = QOut> + std::ops::Div<QIn2, Output = QOut>,
75{
76    fn from(formula: Formula<QOut, QIn1, QIn2>) -> Self {
77        FormulaOperand::Formula(Box::new(formula))
78    }
79}
80
81impl<Q> From<(broadcast::Receiver<crate::Sample<Q>>, String)> for FormulaOperand<Q>
82where
83    Q: Quantity + 'static,
84{
85    fn from(value: (broadcast::Receiver<crate::Sample<Q>>, String)) -> Self {
86        FormulaOperand::Stream(value.0, value.1)
87    }
88}
89
90impl<Q> From<Q> for FormulaOperand<Q>
91where
92    Q: Quantity + 'static,
93{
94    fn from(quantity: Q) -> Self {
95        FormulaOperand::Quantity(quantity)
96    }
97}
98
99impl<Q: Quantity + std::fmt::Display> std::fmt::Display for FormulaOperand<Q> {
100    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101        match self {
102            FormulaOperand::Formula(formula) => write!(f, "{formula}"),
103            FormulaOperand::Stream(_, name) => write!(f, "{name}"),
104            FormulaOperand::Quantity(q) => write!(f, "{q}"),
105        }
106    }
107}
108
109#[derive(Debug)]
110enum FormulaValue<Q: Quantity> {
111    Sample(crate::Sample<Q>),
112    Quantity(Q),
113}
114
115fn format_exprs<Q: Quantity + std::fmt::Display>(
116    f: &mut std::fmt::Formatter<'_>,
117    exprs: &[FormulaOperand<Q>],
118    prefix: &str,
119    sep: &str,
120) -> std::fmt::Result {
121    write!(f, "{prefix}(")?;
122    for (i, expr) in exprs.iter().enumerate() {
123        if i > 0 {
124            write!(f, "{sep}")?;
125        }
126        write!(f, "{expr}")?;
127    }
128    write!(f, ")")
129}
130
131impl<QOut, QIn1, QIn2> std::fmt::Display for Formula<QOut, QIn1, QIn2>
132where
133    QOut: Quantity,
134    QIn1: Quantity,
135    QIn2: Quantity,
136    QIn1: std::ops::Mul<QIn2, Output = QOut>,
137{
138    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        match self {
140            Formula::Subscriber(formula) => formula.fmt(f),
141            Formula::Coalesce(exprs) => format_exprs(f, exprs, "COALESCE", ", "),
142            Formula::Min(exprs) => format_exprs(f, exprs, "MIN", ", "),
143            Formula::Max(exprs) => format_exprs(f, exprs, "MAX", ", "),
144            Formula::Avg(exprs) => format_exprs(f, exprs, "AVG", ", "),
145            Formula::Add(exprs) => format_exprs(f, exprs, "", " + "),
146            Formula::Subtract(exprs) => format_exprs(f, exprs, "", " - "),
147            Formula::Multiply(lhs, rhs) => write!(f, "({lhs} * {rhs})"),
148            Formula::Divide(lhs, rhs) => write!(f, "({lhs} / {rhs})"),
149        }
150    }
151}
152
153async fn synchronize_receivers<Q: Quantity>(
154    formula_items: &mut [FormulaOperand<Q>],
155) -> Result<Vec<FormulaValue<Q>>, crate::Error> {
156    let mut latest = vec![];
157    for item in formula_items.iter_mut() {
158        match item.recv().await {
159            Ok(vv) => latest.push(vv),
160            Err(e) => {
161                return Err(crate::Error::internal(format!(
162                    "Failed to receive from formula operand: {e}"
163                )));
164            }
165        };
166    }
167
168    let max_ts = latest
169        .iter()
170        .filter_map(|value| match value {
171            FormulaValue::Sample(sample) => Some(sample.timestamp()),
172            FormulaValue::Quantity(_) => None,
173        })
174        .max()
175        .ok_or_else(|| crate::Error::internal("No receivers to synchronize".to_string()))?;
176
177    // synchronize all receivers to the latest timestamp
178    for (ii, item) in formula_items.iter_mut().enumerate() {
179        let FormulaOperand::Stream(receiver, _) = item else {
180            continue;
181        };
182        let mut ctr = 0;
183        while let FormulaValue::Sample(sample) = latest[ii]
184            && sample.timestamp() != max_ts
185            && ctr < 10
186        {
187            ctr += 1;
188            match receiver.recv().await {
189                Ok(sample) => latest[ii] = FormulaValue::Sample(sample),
190                Err(e) => {
191                    return Err(crate::Error::connection_failure(format!(
192                        "Could not receive sample: {e}"
193                    )));
194                }
195            };
196        }
197
198        if let FormulaValue::Sample(sample) = latest[ii]
199            && sample.timestamp() != max_ts
200        {
201            return Err(crate::Error::internal(format!(
202                "Could not synchronize receiver {} to the latest timestamp: {}",
203                ii, max_ts
204            )));
205        }
206    }
207
208    Ok(latest)
209}
210
211async fn synchronize_two_receivers<Q1: Quantity, Q2: Quantity>(
212    formula_item1: &mut FormulaOperand<Q1>,
213    formula_item2: &mut FormulaOperand<Q2>,
214) -> Result<(FormulaValue<Q1>, FormulaValue<Q2>), crate::Error> {
215    match (formula_item1, formula_item2) {
216        (FormulaOperand::Stream(rx1, _), FormulaOperand::Stream(rx2, _)) => {
217            let mut latest1 = rx1.recv().await.map_err(|e| {
218                crate::Error::connection_failure(format!("Could not receive sample: {e}"))
219            })?;
220            let mut latest2 = rx2.recv().await.map_err(|e| {
221                crate::Error::connection_failure(format!("Could not receive sample: {e}"))
222            })?;
223
224            let max_ts = latest1.timestamp().max(latest2.timestamp());
225
226            let mut ctr = 0;
227            while latest1.timestamp() != max_ts && ctr < 10 {
228                ctr += 1;
229                latest1 = rx1.recv().await.map_err(|e| {
230                    crate::Error::connection_failure(format!("Could not receive sample: {e}"))
231                })?;
232            }
233            if latest1.timestamp() != max_ts {
234                return Err(crate::Error::internal(format!(
235                    "Could not synchronize receiver 1 to the latest timestamp: {}",
236                    max_ts
237                )));
238            }
239
240            ctr = 0;
241            while latest2.timestamp() != max_ts && ctr < 10 {
242                ctr += 1;
243                latest2 = rx2.recv().await.map_err(|e| {
244                    crate::Error::connection_failure(format!("Could not receive sample: {e}"))
245                })?;
246            }
247            if latest2.timestamp() != max_ts {
248                return Err(crate::Error::internal(format!(
249                    "Could not synchronize receiver 2 to the latest timestamp: {}",
250                    max_ts
251                )));
252            }
253            Ok((FormulaValue::Sample(latest1), FormulaValue::Sample(latest2)))
254        }
255        (FormulaOperand::Formula(..), _) | (_, FormulaOperand::Formula(..)) => {
256            Err(crate::Error::internal(
257                "Internal: synchronize_two_receivers called on unsubscribed FormulaItem."
258                    .to_string(),
259            ))
260        }
261        (item1, item2) => Ok((
262            match item1.recv().await {
263                Ok(v) => v,
264                Err(e) => {
265                    tracing::error!("Could not receive sample: {e}");
266                    return Err(crate::Error::connection_failure(format!(
267                        "Could not receive sample: {e}"
268                    )));
269                }
270            },
271            match item2.recv().await {
272                Ok(v) => v,
273                Err(e) => {
274                    tracing::error!("Could not receive sample: {e}");
275                    return Err(crate::Error::connection_failure(format!(
276                        "Could not receive sample: {e}"
277                    )));
278                }
279            },
280        )),
281    }
282}
283
284async fn run_formula<Q: Quantity>(
285    mut formula_items: Vec<FormulaOperand<Q>>,
286    result_sender: broadcast::Sender<crate::Sample<Q>>,
287    apply_fn: fn(&[FormulaValue<Q>]) -> Option<crate::Sample<Q>>,
288) {
289    match synchronize_receivers(&mut formula_items).await {
290        Ok(latest) => {
291            let value = match apply_fn(&latest) {
292                Some(sample) => sample,
293                None => {
294                    tracing::debug!(
295                        "No value computed. Stopping processing. Input values: {:?}",
296                        latest
297                    );
298                    return;
299                }
300            };
301            if let Err(err) = result_sender.send(value) {
302                tracing::debug!("No subscribers: {}. Stopping processing", err);
303                return;
304            }
305        }
306        Err(e) => {
307            tracing::error!(
308                "Couldn't synchronize receivers: {}.  Stopping processing.",
309                e
310            );
311            return;
312        }
313    };
314    loop {
315        let mut latest = vec![];
316        for formula_item in formula_items.iter_mut() {
317            latest.push(match formula_item.recv().await {
318                Ok(value) => value,
319                Err(RecvError::Closed) => {
320                    tracing::debug!("input receiver closed. stopping formula processing.");
321                    return;
322                }
323                Err(RecvError::Lagged(count)) => {
324                    tracing::warn!("input receiver lagged by {count} samples.");
325                    continue;
326                }
327            });
328        }
329        if latest.is_empty() {
330            tracing::debug!("No active input receivers.  Stopping processing.");
331            return;
332        }
333
334        let value = match apply_fn(&latest) {
335            Some(sample) => sample,
336            None => {
337                tracing::debug!(
338                    "No value computed. Stopping processing. Input values: {:?}",
339                    latest
340                );
341                return;
342            }
343        };
344        if let Err(err) = result_sender.send(value) {
345            tracing::debug!("No subscribers: {}. Stopping processing", err);
346            return;
347        }
348    }
349}
350
351async fn run_two_item_formula<QOut, QIn1, QIn2>(
352    mut formula_item1: FormulaOperand<QIn1>,
353    mut formula_item2: FormulaOperand<QIn2>,
354    result_sender: broadcast::Sender<crate::Sample<QOut>>,
355    apply_fn: fn(&FormulaValue<QIn1>, &FormulaValue<QIn2>) -> Option<crate::Sample<QOut>>,
356) where
357    QOut: Quantity,
358    QIn1: Quantity,
359    QIn2: Quantity,
360{
361    match synchronize_two_receivers(&mut formula_item1, &mut formula_item2).await {
362        Ok((value1, value2)) => {
363            let value = match apply_fn(&value1, &value2) {
364                Some(sample) => sample,
365                None => {
366                    tracing::debug!(
367                        "No value computed. Stopping processing. Input values: {:?}, {:?}",
368                        value1,
369                        value2
370                    );
371                    return;
372                }
373            };
374            if let Err(err) = result_sender.send(value) {
375                tracing::debug!("No subscribers: {}. Stopping processing", err);
376                return;
377            }
378        }
379        Err(e) => {
380            tracing::error!(
381                "Couldn't synchronize receivers: {}.  Stopping processing.",
382                e
383            );
384            return;
385        }
386    }
387
388    loop {
389        let (value1, value2) = match (formula_item1.recv().await, formula_item2.recv().await) {
390            (Ok(v1), Ok(v2)) => (v1, v2),
391            (Err(RecvError::Closed), _) | (_, Err(RecvError::Closed)) => {
392                tracing::debug!("input receiver closed. stopping formula processing.");
393                return;
394            }
395            (Err(RecvError::Lagged(count)), _) | (_, Err(RecvError::Lagged(count))) => {
396                tracing::warn!("input receiver lagged by {count} samples.");
397                continue;
398            }
399        };
400
401        let result = match apply_fn(&value1, &value2) {
402            Some(sample) => sample,
403            None => {
404                tracing::error!(
405                    "No value computed. Stopping processing. Input values: {:?}, {:?}",
406                    value1,
407                    value2
408                );
409                return;
410            }
411        };
412
413        if let Err(err) = result_sender.send(result) {
414            tracing::debug!("No subscribers: {}. Stopping processing", err);
415            return;
416        }
417    }
418}
419
420#[async_trait]
421impl<QOut, QIn1, QIn2> FormulaSubscriber for Formula<QOut, QIn1, QIn2>
422where
423    QOut: Quantity + 'static,
424    QIn1: Quantity + 'static + std::ops::Mul<QIn2, Output = QOut>,
425    QIn2: Quantity + 'static,
426    // The below works only as long as QIn2 is unit-less.  If we want to
427    // support division for other quantities, we need to introduce additional
428    // generic types for division.
429    QIn1: std::ops::Div<QIn2, Output = QOut>,
430{
431    type QuantityType = QOut;
432
433    async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<QOut>>, Error> {
434        match &self {
435            Formula::Subscriber(formula) => (*formula).subscribe().await,
436            Formula::Coalesce(exprs)
437            | Formula::Min(exprs)
438            | Formula::Max(exprs)
439            | Formula::Avg(exprs)
440            | Formula::Add(exprs)
441            | Formula::Subtract(exprs) => {
442                let mut formula_items = Vec::new();
443                for expr in exprs {
444                    formula_items.push(expr.subscribe().await?);
445                }
446
447                let (tx, rx) = broadcast::channel(100);
448                tokio::spawn(run_formula(
449                    formula_items,
450                    tx,
451                    match self {
452                        Formula::Coalesce(_) => application_methods::coalesce_samples,
453                        Formula::Min(_) => application_methods::min_samples,
454                        Formula::Max(_) => application_methods::max_samples,
455                        Formula::Avg(_) => application_methods::avg_samples,
456                        Formula::Add(_) => application_methods::add_samples,
457                        Formula::Subtract(_) => application_methods::subtract_samples,
458                        _ => {
459                            // This case is unreachable due to the match above.
460                            return Err(Error::internal(
461                                "unexpected formula type in subscribe.".to_string(),
462                            ));
463                        }
464                    },
465                ));
466                Ok(rx)
467            }
468            Formula::Multiply(lhs, rhs) | Formula::Divide(lhs, rhs) => {
469                let lhs = lhs.subscribe().await?;
470                let rhs = rhs.subscribe().await?;
471
472                let (tx, rx) = broadcast::channel(100);
473                tokio::spawn(run_two_item_formula(
474                    lhs,
475                    rhs,
476                    tx,
477                    match self {
478                        Formula::Multiply(_, _) => application_methods::multiply_samples,
479                        Formula::Divide(_, _) => application_methods::divide_samples,
480                        _ => {
481                            // This case is unreachable due to the match above.
482                            return Err(Error::internal(
483                                "unexpected formula type in subscribe.".to_string(),
484                            ));
485                        }
486                    },
487                ));
488
489                Ok(rx)
490            }
491        }
492    }
493}