1mod application_methods;
7mod composition;
8
9use async_trait::async_trait;
10use tokio::sync::broadcast::{self, error::RecvError};
11
12use crate::{Error, Sample, logical_meter::formula::FormulaSubscriber, quantity::Quantity};
13
14pub enum Formula<QOut, QIn1 = QOut, QIn2 = f32>
16where
17 QOut: Quantity + 'static,
18 QIn1: Quantity + 'static,
19 QIn2: Quantity + 'static,
20 QIn1: std::ops::Mul<QIn2, Output = QOut>,
21{
22 Subscriber(Box<dyn FormulaSubscriber<QuantityType = QOut>>),
23 Coalesce(Vec<FormulaOperand<QOut>>),
24 Min(Vec<FormulaOperand<QOut>>),
25 Max(Vec<FormulaOperand<QOut>>),
26 Avg(Vec<FormulaOperand<QOut>>),
27 Add(Vec<FormulaOperand<QOut>>),
28 Subtract(Vec<FormulaOperand<QOut>>),
29 Multiply(FormulaOperand<QIn1>, FormulaOperand<QIn2>),
30 Divide(FormulaOperand<QIn1>, FormulaOperand<QIn2>),
31}
32
33pub enum FormulaOperand<Q: Quantity + 'static> {
34 Formula(Box<dyn FormulaSubscriber<QuantityType = Q>>),
35 Stream(broadcast::Receiver<crate::Sample<Q>>, String),
36 Quantity(Q),
37}
38
39impl<Q: Quantity + 'static> FormulaOperand<Q> {
40 async fn subscribe(&self) -> Result<FormulaOperand<Q>, Error> {
41 match self {
42 FormulaOperand::Formula(formula_subscriber) => Ok(FormulaOperand::Stream(
43 (*formula_subscriber).subscribe().await?,
44 formula_subscriber.to_string(),
45 )),
46 FormulaOperand::Stream(receiver, name) => {
47 Ok(FormulaOperand::Stream(receiver.resubscribe(), name.clone()))
48 }
49 FormulaOperand::Quantity(quantity) => Ok(FormulaOperand::Quantity(*quantity)),
50 }
51 }
52
53 async fn recv(&mut self) -> Result<FormulaValue<Q>, RecvError> {
54 match self {
55 FormulaOperand::Formula(..) => {
56 tracing::error!("Internal: FormulaItem::recv called on unsubscribed FormulaItem.");
57 Err(RecvError::Closed)
58 }
59 FormulaOperand::Stream(receiver, _) => match receiver.recv().await {
60 Ok(sample) => Ok(FormulaValue::Sample(sample)),
61 Err(e) => Err(e),
62 },
63 FormulaOperand::Quantity(q) => Ok(FormulaValue::Quantity(*q)),
64 }
65 }
66}
67
68impl<QOut, QIn1, QIn2> From<Formula<QOut, QIn1, QIn2>> for FormulaOperand<QOut>
69where
70 QOut: Quantity + 'static,
71 QIn1: Quantity + 'static,
72 QIn2: Quantity + 'static,
73 QOut: std::ops::Div<QIn2, Output = QIn1>,
74 QIn1: std::ops::Mul<QIn2, Output = QOut> + std::ops::Div<QIn2, Output = QOut>,
75{
76 fn from(formula: Formula<QOut, QIn1, QIn2>) -> Self {
77 FormulaOperand::Formula(Box::new(formula))
78 }
79}
80
81impl<Q> From<(broadcast::Receiver<crate::Sample<Q>>, String)> for FormulaOperand<Q>
82where
83 Q: Quantity + 'static,
84{
85 fn from(value: (broadcast::Receiver<crate::Sample<Q>>, String)) -> Self {
86 FormulaOperand::Stream(value.0, value.1)
87 }
88}
89
90impl<Q> From<Q> for FormulaOperand<Q>
91where
92 Q: Quantity + 'static,
93{
94 fn from(quantity: Q) -> Self {
95 FormulaOperand::Quantity(quantity)
96 }
97}
98
99impl<Q: Quantity + std::fmt::Display> std::fmt::Display for FormulaOperand<Q> {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 match self {
102 FormulaOperand::Formula(formula) => write!(f, "{formula}"),
103 FormulaOperand::Stream(_, name) => write!(f, "{name}"),
104 FormulaOperand::Quantity(q) => write!(f, "{q}"),
105 }
106 }
107}
108
109#[derive(Debug)]
110enum FormulaValue<Q: Quantity> {
111 Sample(crate::Sample<Q>),
112 Quantity(Q),
113}
114
115fn format_exprs<Q: Quantity + std::fmt::Display>(
116 f: &mut std::fmt::Formatter<'_>,
117 exprs: &[FormulaOperand<Q>],
118 prefix: &str,
119 sep: &str,
120) -> std::fmt::Result {
121 write!(f, "{prefix}(")?;
122 for (i, expr) in exprs.iter().enumerate() {
123 if i > 0 {
124 write!(f, "{sep}")?;
125 }
126 write!(f, "{expr}")?;
127 }
128 write!(f, ")")
129}
130
131impl<QOut, QIn1, QIn2> std::fmt::Display for Formula<QOut, QIn1, QIn2>
132where
133 QOut: Quantity,
134 QIn1: Quantity,
135 QIn2: Quantity,
136 QIn1: std::ops::Mul<QIn2, Output = QOut>,
137{
138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 match self {
140 Formula::Subscriber(formula) => formula.fmt(f),
141 Formula::Coalesce(exprs) => format_exprs(f, exprs, "COALESCE", ", "),
142 Formula::Min(exprs) => format_exprs(f, exprs, "MIN", ", "),
143 Formula::Max(exprs) => format_exprs(f, exprs, "MAX", ", "),
144 Formula::Avg(exprs) => format_exprs(f, exprs, "AVG", ", "),
145 Formula::Add(exprs) => format_exprs(f, exprs, "", " + "),
146 Formula::Subtract(exprs) => format_exprs(f, exprs, "", " - "),
147 Formula::Multiply(lhs, rhs) => write!(f, "({lhs} * {rhs})"),
148 Formula::Divide(lhs, rhs) => write!(f, "({lhs} / {rhs})"),
149 }
150 }
151}
152
153async fn synchronize_receivers<Q: Quantity>(
154 formula_items: &mut [FormulaOperand<Q>],
155) -> Result<Vec<FormulaValue<Q>>, crate::Error> {
156 let mut latest = vec![];
157 for item in formula_items.iter_mut() {
158 match item.recv().await {
159 Ok(vv) => latest.push(vv),
160 Err(_) => todo!(),
161 };
162 }
163
164 let max_ts = latest
165 .iter()
166 .filter_map(|value| match value {
167 FormulaValue::Sample(sample) => Some(sample.timestamp()),
168 FormulaValue::Quantity(_) => None,
169 })
170 .max()
171 .ok_or_else(|| crate::Error::internal("No receivers to synchronize".to_string()))?;
172
173 for (ii, item) in formula_items.iter_mut().enumerate() {
175 let FormulaOperand::Stream(receiver, _) = item else {
176 continue;
177 };
178 let mut ctr = 0;
179 while let FormulaValue::Sample(sample) = latest[ii]
180 && sample.timestamp() != max_ts
181 && ctr < 10
182 {
183 ctr += 1;
184 match receiver.recv().await {
185 Ok(sample) => latest[ii] = FormulaValue::Sample(sample),
186 Err(e) => {
187 return Err(crate::Error::connection_failure(format!(
188 "Could not receive sample: {e}"
189 )));
190 }
191 };
192 }
193
194 if let FormulaValue::Sample(sample) = latest[ii]
195 && sample.timestamp() != max_ts
196 {
197 return Err(crate::Error::internal(format!(
198 "Could not synchronize receiver {} to the latest timestamp: {}",
199 ii, max_ts
200 )));
201 }
202 }
203
204 Ok(latest)
205}
206
207async fn synchronize_two_receivers<Q1: Quantity, Q2: Quantity>(
208 formula_item1: &mut FormulaOperand<Q1>,
209 formula_item2: &mut FormulaOperand<Q2>,
210) -> Result<(FormulaValue<Q1>, FormulaValue<Q2>), crate::Error> {
211 match (formula_item1, formula_item2) {
212 (FormulaOperand::Stream(rx1, _), FormulaOperand::Stream(rx2, _)) => {
213 let mut latest1 = rx1.recv().await.map_err(|e| {
214 crate::Error::connection_failure(format!("Could not receive sample: {e}"))
215 })?;
216 let mut latest2 = rx2.recv().await.map_err(|e| {
217 crate::Error::connection_failure(format!("Could not receive sample: {e}"))
218 })?;
219
220 let max_ts = latest1.timestamp().max(latest2.timestamp());
221
222 let mut ctr = 0;
223 while latest1.timestamp() != max_ts && ctr < 10 {
224 ctr += 1;
225 latest1 = rx1.recv().await.map_err(|e| {
226 crate::Error::connection_failure(format!("Could not receive sample: {e}"))
227 })?;
228 }
229 if latest1.timestamp() != max_ts {
230 return Err(crate::Error::internal(format!(
231 "Could not synchronize receiver 1 to the latest timestamp: {}",
232 max_ts
233 )));
234 }
235
236 ctr = 0;
237 while latest2.timestamp() != max_ts && ctr < 10 {
238 ctr += 1;
239 latest2 = rx2.recv().await.map_err(|e| {
240 crate::Error::connection_failure(format!("Could not receive sample: {e}"))
241 })?;
242 }
243 if latest2.timestamp() != max_ts {
244 return Err(crate::Error::internal(format!(
245 "Could not synchronize receiver 2 to the latest timestamp: {}",
246 max_ts
247 )));
248 }
249 Ok((FormulaValue::Sample(latest1), FormulaValue::Sample(latest2)))
250 }
251 (FormulaOperand::Formula(..), _) | (_, FormulaOperand::Formula(..)) => {
252 Err(crate::Error::internal(
253 "Internal: synchronize_two_receivers called on unsubscribed FormulaItem."
254 .to_string(),
255 ))
256 }
257 (item1, item2) => Ok((
258 match item1.recv().await {
259 Ok(v) => v,
260 Err(e) => {
261 tracing::error!("Could not receive sample: {e}");
262 return Err(crate::Error::connection_failure(format!(
263 "Could not receive sample: {e}"
264 )));
265 }
266 },
267 match item2.recv().await {
268 Ok(v) => v,
269 Err(e) => {
270 tracing::error!("Could not receive sample: {e}");
271 return Err(crate::Error::connection_failure(format!(
272 "Could not receive sample: {e}"
273 )));
274 }
275 },
276 )),
277 }
278}
279
280async fn run_formula<Q: Quantity>(
281 mut formula_items: Vec<FormulaOperand<Q>>,
282 result_sender: broadcast::Sender<crate::Sample<Q>>,
283 apply_fn: fn(&[FormulaValue<Q>]) -> Option<crate::Sample<Q>>,
284) {
285 match synchronize_receivers(&mut formula_items).await {
286 Ok(latest) => {
287 let value = match apply_fn(&latest) {
288 Some(sample) => sample,
289 None => {
290 tracing::debug!(
291 "No value computed. Stopping processing. Input values: {:?}",
292 latest
293 );
294 return;
295 }
296 };
297 if let Err(err) = result_sender.send(value) {
298 tracing::debug!("No subscribers: {}. Stopping processing", err);
299 return;
300 }
301 }
302 Err(e) => {
303 tracing::error!(
304 "Couldn't synchronize receivers: {}. Stopping processing.",
305 e
306 );
307 return;
308 }
309 };
310 loop {
311 let mut latest = vec![];
312 for formula_item in formula_items.iter_mut() {
313 latest.push(match formula_item.recv().await {
314 Ok(value) => value,
315 Err(RecvError::Closed) => {
316 tracing::debug!("input receiver closed. stopping formula processing.");
317 return;
318 }
319 Err(RecvError::Lagged(count)) => {
320 tracing::warn!("input receiver lagged by {count} samples.");
321 continue;
322 }
323 });
324 }
325 if latest.is_empty() {
326 tracing::debug!("No active input receivers. Stopping processing.");
327 return;
328 }
329
330 let value = match apply_fn(&latest) {
331 Some(sample) => sample,
332 None => {
333 tracing::debug!(
334 "No value computed. Stopping processing. Input values: {:?}",
335 latest
336 );
337 return;
338 }
339 };
340 if let Err(err) = result_sender.send(value) {
341 tracing::debug!("No subscribers: {}. Stopping processing", err);
342 return;
343 }
344 }
345}
346
347async fn run_two_item_formula<QOut, QIn1, QIn2>(
348 mut formula_item1: FormulaOperand<QIn1>,
349 mut formula_item2: FormulaOperand<QIn2>,
350 result_sender: broadcast::Sender<crate::Sample<QOut>>,
351 apply_fn: fn(&FormulaValue<QIn1>, &FormulaValue<QIn2>) -> Option<crate::Sample<QOut>>,
352) where
353 QOut: Quantity,
354 QIn1: Quantity,
355 QIn2: Quantity,
356{
357 match synchronize_two_receivers(&mut formula_item1, &mut formula_item2).await {
358 Ok((value1, value2)) => {
359 let value = match apply_fn(&value1, &value2) {
360 Some(sample) => sample,
361 None => {
362 tracing::debug!(
363 "No value computed. Stopping processing. Input values: {:?}, {:?}",
364 value1,
365 value2
366 );
367 return;
368 }
369 };
370 if let Err(err) = result_sender.send(value) {
371 tracing::debug!("No subscribers: {}. Stopping processing", err);
372 return;
373 }
374 }
375 Err(e) => {
376 tracing::error!(
377 "Couldn't synchronize receivers: {}. Stopping processing.",
378 e
379 );
380 return;
381 }
382 }
383
384 loop {
385 let (value1, value2) = match (formula_item1.recv().await, formula_item2.recv().await) {
386 (Ok(v1), Ok(v2)) => (v1, v2),
387 (Err(RecvError::Closed), _) | (_, Err(RecvError::Closed)) => {
388 tracing::debug!("input receiver closed. stopping formula processing.");
389 return;
390 }
391 (Err(RecvError::Lagged(count)), _) | (_, Err(RecvError::Lagged(count))) => {
392 tracing::warn!("input receiver lagged by {count} samples.");
393 continue;
394 }
395 };
396
397 let result = match apply_fn(&value1, &value2) {
398 Some(sample) => sample,
399 None => {
400 tracing::error!(
401 "No value computed. Stopping processing. Input values: {:?}, {:?}",
402 value1,
403 value2
404 );
405 return;
406 }
407 };
408
409 if let Err(err) = result_sender.send(result) {
410 tracing::debug!("No subscribers: {}. Stopping processing", err);
411 return;
412 }
413 }
414}
415
416#[async_trait]
417impl<QOut, QIn1, QIn2> FormulaSubscriber for Formula<QOut, QIn1, QIn2>
418where
419 QOut: Quantity + 'static,
420 QIn1: Quantity + 'static + std::ops::Mul<QIn2, Output = QOut>,
421 QIn2: Quantity + 'static,
422 QIn1: std::ops::Div<QIn2, Output = QOut>,
426{
427 type QuantityType = QOut;
428
429 async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<QOut>>, Error> {
430 match &self {
431 Formula::Subscriber(formula) => (*formula).subscribe().await,
432 Formula::Coalesce(exprs)
433 | Formula::Min(exprs)
434 | Formula::Max(exprs)
435 | Formula::Avg(exprs)
436 | Formula::Add(exprs)
437 | Formula::Subtract(exprs) => {
438 let mut formula_items = Vec::new();
439 for expr in exprs {
440 formula_items.push(expr.subscribe().await?);
441 }
442
443 let (tx, rx) = broadcast::channel(100);
444 tokio::spawn(run_formula(
445 formula_items,
446 tx,
447 match self {
448 Formula::Coalesce(_) => application_methods::coalesce_samples,
449 Formula::Min(_) => application_methods::min_samples,
450 Formula::Max(_) => application_methods::max_samples,
451 Formula::Avg(_) => application_methods::avg_samples,
452 Formula::Add(_) => application_methods::add_samples,
453 Formula::Subtract(_) => application_methods::subtract_samples,
454 _ => {
455 return Err(Error::internal(
457 "unexpected formula type in subscribe.".to_string(),
458 ));
459 }
460 },
461 ));
462 Ok(rx)
463 }
464 Formula::Multiply(lhs, rhs) | Formula::Divide(lhs, rhs) => {
465 let lhs = lhs.subscribe().await?;
466 let rhs = rhs.subscribe().await?;
467
468 let (tx, rx) = broadcast::channel(100);
469 tokio::spawn(run_two_item_formula(
470 lhs,
471 rhs,
472 tx,
473 match self {
474 Formula::Multiply(_, _) => application_methods::multiply_samples,
475 Formula::Divide(_, _) => application_methods::divide_samples,
476 _ => {
477 return Err(Error::internal(
479 "unexpected formula type in subscribe.".to_string(),
480 ));
481 }
482 },
483 ));
484
485 Ok(rx)
486 }
487 }
488 }
489}