1mod application_methods;
7mod composition;
8
9use async_trait::async_trait;
10use tokio::sync::broadcast::{self, error::RecvError};
11
12use crate::{Error, Sample, logical_meter::formula::FormulaSubscriber, quantity::Quantity};
13
14pub enum Formula<QOut, QIn1 = QOut, QIn2 = f32>
16where
17 QOut: Quantity + 'static,
18 QIn1: Quantity + 'static,
19 QIn2: Quantity + 'static,
20 QIn1: std::ops::Mul<QIn2, Output = QOut>,
21{
22 Subscriber(Box<dyn FormulaSubscriber<QuantityType = QOut>>),
23 Coalesce(Vec<FormulaOperand<QOut>>),
24 Min(Vec<FormulaOperand<QOut>>),
25 Max(Vec<FormulaOperand<QOut>>),
26 Avg(Vec<FormulaOperand<QOut>>),
27 Add(Vec<FormulaOperand<QOut>>),
28 Subtract(Vec<FormulaOperand<QOut>>),
29 Multiply(FormulaOperand<QIn1>, FormulaOperand<QIn2>),
30 Divide(FormulaOperand<QIn1>, FormulaOperand<QIn2>),
31}
32
33pub enum FormulaOperand<Q: Quantity + 'static> {
34 Formula(Box<dyn FormulaSubscriber<QuantityType = Q>>),
35 Stream(broadcast::Receiver<crate::Sample<Q>>, String),
36 Quantity(Q),
37}
38
39impl<Q: Quantity + 'static> FormulaOperand<Q> {
40 async fn subscribe(&self) -> Result<FormulaOperand<Q>, Error> {
41 match self {
42 FormulaOperand::Formula(formula_subscriber) => Ok(FormulaOperand::Stream(
43 (*formula_subscriber).subscribe().await?,
44 formula_subscriber.to_string(),
45 )),
46 FormulaOperand::Stream(receiver, name) => {
47 Ok(FormulaOperand::Stream(receiver.resubscribe(), name.clone()))
48 }
49 FormulaOperand::Quantity(quantity) => Ok(FormulaOperand::Quantity(*quantity)),
50 }
51 }
52
53 async fn recv(&mut self) -> Result<FormulaValue<Q>, RecvError> {
54 match self {
55 FormulaOperand::Formula(..) => {
56 tracing::error!("Internal: FormulaItem::recv called on unsubscribed FormulaItem.");
57 Err(RecvError::Closed)
58 }
59 FormulaOperand::Stream(receiver, _) => match receiver.recv().await {
60 Ok(sample) => Ok(FormulaValue::Sample(sample)),
61 Err(e) => Err(e),
62 },
63 FormulaOperand::Quantity(q) => Ok(FormulaValue::Quantity(*q)),
64 }
65 }
66}
67
68impl<QOut, QIn1, QIn2> From<Formula<QOut, QIn1, QIn2>> for FormulaOperand<QOut>
69where
70 QOut: Quantity + 'static,
71 QIn1: Quantity + 'static,
72 QIn2: Quantity + 'static,
73 QOut: std::ops::Div<QIn2, Output = QIn1>,
74 QIn1: std::ops::Mul<QIn2, Output = QOut> + std::ops::Div<QIn2, Output = QOut>,
75{
76 fn from(formula: Formula<QOut, QIn1, QIn2>) -> Self {
77 FormulaOperand::Formula(Box::new(formula))
78 }
79}
80
81impl<Q> From<(broadcast::Receiver<crate::Sample<Q>>, String)> for FormulaOperand<Q>
82where
83 Q: Quantity + 'static,
84{
85 fn from(value: (broadcast::Receiver<crate::Sample<Q>>, String)) -> Self {
86 FormulaOperand::Stream(value.0, value.1)
87 }
88}
89
90impl<Q> From<Q> for FormulaOperand<Q>
91where
92 Q: Quantity + 'static,
93{
94 fn from(quantity: Q) -> Self {
95 FormulaOperand::Quantity(quantity)
96 }
97}
98
99impl<Q: Quantity + std::fmt::Display> std::fmt::Display for FormulaOperand<Q> {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 FormulaOperand::Formula(formula) => write!(f, "{formula}"),
103 FormulaOperand::Stream(_, name) => write!(f, "{name}"),
104 FormulaOperand::Quantity(q) => write!(f, "{q}"),
105 }
106 }
107}
108
109#[derive(Debug)]
110enum FormulaValue<Q: Quantity> {
111 Sample(crate::Sample<Q>),
112 Quantity(Q),
113}
114
115fn format_exprs<Q: Quantity + std::fmt::Display>(
116 f: &mut std::fmt::Formatter<'_>,
117 exprs: &[FormulaOperand<Q>],
118 prefix: &str,
119 sep: &str,
120) -> std::fmt::Result {
121 write!(f, "{prefix}(")?;
122 for (i, expr) in exprs.iter().enumerate() {
123 if i > 0 {
124 write!(f, "{sep}")?;
125 }
126 write!(f, "{expr}")?;
127 }
128 write!(f, ")")
129}
130
131impl<QOut, QIn1, QIn2> std::fmt::Display for Formula<QOut, QIn1, QIn2>
132where
133 QOut: Quantity,
134 QIn1: Quantity,
135 QIn2: Quantity,
136 QIn1: std::ops::Mul<QIn2, Output = QOut>,
137{
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 match self {
140 Formula::Subscriber(formula) => formula.fmt(f),
141 Formula::Coalesce(exprs) => format_exprs(f, exprs, "COALESCE", ", "),
142 Formula::Min(exprs) => format_exprs(f, exprs, "MIN", ", "),
143 Formula::Max(exprs) => format_exprs(f, exprs, "MAX", ", "),
144 Formula::Avg(exprs) => format_exprs(f, exprs, "AVG", ", "),
145 Formula::Add(exprs) => format_exprs(f, exprs, "", " + "),
146 Formula::Subtract(exprs) => format_exprs(f, exprs, "", " - "),
147 Formula::Multiply(lhs, rhs) => write!(f, "({lhs} * {rhs})"),
148 Formula::Divide(lhs, rhs) => write!(f, "({lhs} / {rhs})"),
149 }
150 }
151}
152
153async fn synchronize_receivers<Q: Quantity>(
154 formula_items: &mut [FormulaOperand<Q>],
155) -> Result<Vec<FormulaValue<Q>>, crate::Error> {
156 let mut latest = vec![];
157 for item in formula_items.iter_mut() {
158 match item.recv().await {
159 Ok(vv) => latest.push(vv),
160 Err(e) => {
161 return Err(crate::Error::internal(format!(
162 "Failed to receive from formula operand: {e}"
163 )));
164 }
165 };
166 }
167
168 let max_ts = latest
169 .iter()
170 .filter_map(|value| match value {
171 FormulaValue::Sample(sample) => Some(sample.timestamp()),
172 FormulaValue::Quantity(_) => None,
173 })
174 .max()
175 .ok_or_else(|| crate::Error::internal("No receivers to synchronize".to_string()))?;
176
177 for (ii, item) in formula_items.iter_mut().enumerate() {
179 let FormulaOperand::Stream(receiver, _) = item else {
180 continue;
181 };
182 let mut ctr = 0;
183 while let FormulaValue::Sample(sample) = latest[ii]
184 && sample.timestamp() != max_ts
185 && ctr < 10
186 {
187 ctr += 1;
188 match receiver.recv().await {
189 Ok(sample) => latest[ii] = FormulaValue::Sample(sample),
190 Err(e) => {
191 return Err(crate::Error::connection_failure(format!(
192 "Could not receive sample: {e}"
193 )));
194 }
195 };
196 }
197
198 if let FormulaValue::Sample(sample) = latest[ii]
199 && sample.timestamp() != max_ts
200 {
201 return Err(crate::Error::internal(format!(
202 "Could not synchronize receiver {} to the latest timestamp: {}",
203 ii, max_ts
204 )));
205 }
206 }
207
208 Ok(latest)
209}
210
211async fn synchronize_two_receivers<Q1: Quantity, Q2: Quantity>(
212 formula_item1: &mut FormulaOperand<Q1>,
213 formula_item2: &mut FormulaOperand<Q2>,
214) -> Result<(FormulaValue<Q1>, FormulaValue<Q2>), crate::Error> {
215 match (formula_item1, formula_item2) {
216 (FormulaOperand::Stream(rx1, _), FormulaOperand::Stream(rx2, _)) => {
217 let mut latest1 = rx1.recv().await.map_err(|e| {
218 crate::Error::connection_failure(format!("Could not receive sample: {e}"))
219 })?;
220 let mut latest2 = rx2.recv().await.map_err(|e| {
221 crate::Error::connection_failure(format!("Could not receive sample: {e}"))
222 })?;
223
224 let max_ts = latest1.timestamp().max(latest2.timestamp());
225
226 let mut ctr = 0;
227 while latest1.timestamp() != max_ts && ctr < 10 {
228 ctr += 1;
229 latest1 = rx1.recv().await.map_err(|e| {
230 crate::Error::connection_failure(format!("Could not receive sample: {e}"))
231 })?;
232 }
233 if latest1.timestamp() != max_ts {
234 return Err(crate::Error::internal(format!(
235 "Could not synchronize receiver 1 to the latest timestamp: {}",
236 max_ts
237 )));
238 }
239
240 ctr = 0;
241 while latest2.timestamp() != max_ts && ctr < 10 {
242 ctr += 1;
243 latest2 = rx2.recv().await.map_err(|e| {
244 crate::Error::connection_failure(format!("Could not receive sample: {e}"))
245 })?;
246 }
247 if latest2.timestamp() != max_ts {
248 return Err(crate::Error::internal(format!(
249 "Could not synchronize receiver 2 to the latest timestamp: {}",
250 max_ts
251 )));
252 }
253 Ok((FormulaValue::Sample(latest1), FormulaValue::Sample(latest2)))
254 }
255 (FormulaOperand::Formula(..), _) | (_, FormulaOperand::Formula(..)) => {
256 Err(crate::Error::internal(
257 "Internal: synchronize_two_receivers called on unsubscribed FormulaItem."
258 .to_string(),
259 ))
260 }
261 (item1, item2) => Ok((
262 match item1.recv().await {
263 Ok(v) => v,
264 Err(e) => {
265 tracing::error!("Could not receive sample: {e}");
266 return Err(crate::Error::connection_failure(format!(
267 "Could not receive sample: {e}"
268 )));
269 }
270 },
271 match item2.recv().await {
272 Ok(v) => v,
273 Err(e) => {
274 tracing::error!("Could not receive sample: {e}");
275 return Err(crate::Error::connection_failure(format!(
276 "Could not receive sample: {e}"
277 )));
278 }
279 },
280 )),
281 }
282}
283
284async fn run_formula<Q: Quantity>(
285 mut formula_items: Vec<FormulaOperand<Q>>,
286 result_sender: broadcast::Sender<crate::Sample<Q>>,
287 apply_fn: fn(&[FormulaValue<Q>]) -> Option<crate::Sample<Q>>,
288) {
289 match synchronize_receivers(&mut formula_items).await {
290 Ok(latest) => {
291 let value = match apply_fn(&latest) {
292 Some(sample) => sample,
293 None => {
294 tracing::debug!(
295 "No value computed. Stopping processing. Input values: {:?}",
296 latest
297 );
298 return;
299 }
300 };
301 if let Err(err) = result_sender.send(value) {
302 tracing::debug!("No subscribers: {}. Stopping processing", err);
303 return;
304 }
305 }
306 Err(e) => {
307 tracing::error!(
308 "Couldn't synchronize receivers: {}. Stopping processing.",
309 e
310 );
311 return;
312 }
313 };
314 loop {
315 let mut latest = vec![];
316 for formula_item in formula_items.iter_mut() {
317 latest.push(match formula_item.recv().await {
318 Ok(value) => value,
319 Err(RecvError::Closed) => {
320 tracing::debug!("input receiver closed. stopping formula processing.");
321 return;
322 }
323 Err(RecvError::Lagged(count)) => {
324 tracing::warn!("input receiver lagged by {count} samples.");
325 continue;
326 }
327 });
328 }
329 if latest.is_empty() {
330 tracing::debug!("No active input receivers. Stopping processing.");
331 return;
332 }
333
334 let value = match apply_fn(&latest) {
335 Some(sample) => sample,
336 None => {
337 tracing::debug!(
338 "No value computed. Stopping processing. Input values: {:?}",
339 latest
340 );
341 return;
342 }
343 };
344 if let Err(err) = result_sender.send(value) {
345 tracing::debug!("No subscribers: {}. Stopping processing", err);
346 return;
347 }
348 }
349}
350
351async fn run_two_item_formula<QOut, QIn1, QIn2>(
352 mut formula_item1: FormulaOperand<QIn1>,
353 mut formula_item2: FormulaOperand<QIn2>,
354 result_sender: broadcast::Sender<crate::Sample<QOut>>,
355 apply_fn: fn(&FormulaValue<QIn1>, &FormulaValue<QIn2>) -> Option<crate::Sample<QOut>>,
356) where
357 QOut: Quantity,
358 QIn1: Quantity,
359 QIn2: Quantity,
360{
361 match synchronize_two_receivers(&mut formula_item1, &mut formula_item2).await {
362 Ok((value1, value2)) => {
363 let value = match apply_fn(&value1, &value2) {
364 Some(sample) => sample,
365 None => {
366 tracing::debug!(
367 "No value computed. Stopping processing. Input values: {:?}, {:?}",
368 value1,
369 value2
370 );
371 return;
372 }
373 };
374 if let Err(err) = result_sender.send(value) {
375 tracing::debug!("No subscribers: {}. Stopping processing", err);
376 return;
377 }
378 }
379 Err(e) => {
380 tracing::error!(
381 "Couldn't synchronize receivers: {}. Stopping processing.",
382 e
383 );
384 return;
385 }
386 }
387
388 loop {
389 let (value1, value2) = match (formula_item1.recv().await, formula_item2.recv().await) {
390 (Ok(v1), Ok(v2)) => (v1, v2),
391 (Err(RecvError::Closed), _) | (_, Err(RecvError::Closed)) => {
392 tracing::debug!("input receiver closed. stopping formula processing.");
393 return;
394 }
395 (Err(RecvError::Lagged(count)), _) | (_, Err(RecvError::Lagged(count))) => {
396 tracing::warn!("input receiver lagged by {count} samples.");
397 continue;
398 }
399 };
400
401 let result = match apply_fn(&value1, &value2) {
402 Some(sample) => sample,
403 None => {
404 tracing::error!(
405 "No value computed. Stopping processing. Input values: {:?}, {:?}",
406 value1,
407 value2
408 );
409 return;
410 }
411 };
412
413 if let Err(err) = result_sender.send(result) {
414 tracing::debug!("No subscribers: {}. Stopping processing", err);
415 return;
416 }
417 }
418}
419
420#[async_trait]
421impl<QOut, QIn1, QIn2> FormulaSubscriber for Formula<QOut, QIn1, QIn2>
422where
423 QOut: Quantity + 'static,
424 QIn1: Quantity + 'static + std::ops::Mul<QIn2, Output = QOut>,
425 QIn2: Quantity + 'static,
426 QIn1: std::ops::Div<QIn2, Output = QOut>,
430{
431 type QuantityType = QOut;
432
433 async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<QOut>>, Error> {
434 match &self {
435 Formula::Subscriber(formula) => (*formula).subscribe().await,
436 Formula::Coalesce(exprs)
437 | Formula::Min(exprs)
438 | Formula::Max(exprs)
439 | Formula::Avg(exprs)
440 | Formula::Add(exprs)
441 | Formula::Subtract(exprs) => {
442 let mut formula_items = Vec::new();
443 for expr in exprs {
444 formula_items.push(expr.subscribe().await?);
445 }
446
447 let (tx, rx) = broadcast::channel(100);
448 tokio::spawn(run_formula(
449 formula_items,
450 tx,
451 match self {
452 Formula::Coalesce(_) => application_methods::coalesce_samples,
453 Formula::Min(_) => application_methods::min_samples,
454 Formula::Max(_) => application_methods::max_samples,
455 Formula::Avg(_) => application_methods::avg_samples,
456 Formula::Add(_) => application_methods::add_samples,
457 Formula::Subtract(_) => application_methods::subtract_samples,
458 _ => {
459 return Err(Error::internal(
461 "unexpected formula type in subscribe.".to_string(),
462 ));
463 }
464 },
465 ));
466 Ok(rx)
467 }
468 Formula::Multiply(lhs, rhs) | Formula::Divide(lhs, rhs) => {
469 let lhs = lhs.subscribe().await?;
470 let rhs = rhs.subscribe().await?;
471
472 let (tx, rx) = broadcast::channel(100);
473 tokio::spawn(run_two_item_formula(
474 lhs,
475 rhs,
476 tx,
477 match self {
478 Formula::Multiply(_, _) => application_methods::multiply_samples,
479 Formula::Divide(_, _) => application_methods::divide_samples,
480 _ => {
481 return Err(Error::internal(
483 "unexpected formula type in subscribe.".to_string(),
484 ));
485 }
486 },
487 ));
488
489 Ok(rx)
490 }
491 }
492 }
493}