1use tokio::sync::broadcast;
7
8use crate::{
9 Error,
10 Sample,
11 logical_meter::formula::{
12 Formula,
13 FormulaSubscriber,
14 async_formula::FormulaOperand, },
16 quantity::Quantity, };
18
19impl<Q> Formula<Q>
20where
21 Q: Quantity + 'static,
22{
23 pub fn coalesce(self, other: impl Into<FormulaOperand<Q>>) -> Result<Formula<Q>, Error> {
24 match self {
25 Formula::Coalesce(mut items) => {
26 items.push(other.into());
27 Ok(Formula::Coalesce(items))
28 }
29 _ => Ok(Formula::Coalesce(vec![
30 FormulaOperand::Formula(Box::new(Formula::<Q, Q, f32>::Subscriber(Box::new(self)))),
31 other.into(),
32 ])),
33 }
34 }
35
36 pub fn min(self, other: impl Into<FormulaOperand<Q>>) -> Result<Formula<Q>, Error> {
37 match self {
38 Formula::Min(mut items) => {
39 items.push(other.into());
40 Ok(Formula::Min(items))
41 }
42 _ => Ok(Formula::Min(vec![
43 FormulaOperand::Formula(Box::new(Formula::<Q, Q, f32>::Subscriber(Box::new(self)))),
44 other.into(),
45 ])),
46 }
47 }
48
49 pub fn max(self, other: impl Into<FormulaOperand<Q>>) -> Result<Formula<Q>, Error> {
50 match self {
51 Formula::Max(mut items) => {
52 items.push(other.into());
53 Ok(Formula::Max(items))
54 }
55 _ => Ok(Formula::Max(vec![
56 FormulaOperand::Formula(Box::new(Formula::<Q, Q, f32>::Subscriber(Box::new(self)))),
57 other.into(),
58 ])),
59 }
60 }
61
62 pub fn avg(self, others: Vec<Formula<Q>>) -> Result<Formula<Q>, Error> {
63 let mut exprs: Vec<FormulaOperand<Q>> =
64 vec![FormulaOperand::Formula(Box::new(
65 Formula::<Q, Q, f32>::Subscriber(Box::new(self)),
66 ))];
67 for other in others {
68 exprs.push(other.into());
69 }
70 Ok(Formula::Avg(exprs))
71 }
72
73 pub async fn subscribe(&self) -> Result<broadcast::Receiver<Sample<Q>>, Error> {
74 <Self as FormulaSubscriber>::subscribe(self).await
75 }
76}
77
78impl<Q, F> std::ops::Add<F> for Formula<Q>
79where
80 F: Into<FormulaOperand<Q>>,
81 Q: Quantity + 'static,
82{
83 type Output = Formula<Q>;
84
85 fn add(self, other: F) -> Self::Output {
86 Formula::Add(vec![FormulaOperand::Formula(Box::new(self)), other.into()])
87 }
88}
89
90impl<Q, F> std::ops::Sub<F> for Formula<Q>
91where
92 F: Into<FormulaOperand<Q>>,
93 Q: Quantity + 'static,
94{
95 type Output = Formula<Q>;
96
97 fn sub(self, other: F) -> Self::Output {
98 Formula::Subtract(vec![FormulaOperand::Formula(Box::new(self)), other.into()])
99 }
100}
101
102impl<Q> std::ops::Mul<f32> for Formula<Q, Q, f32>
103where
104 Q: Quantity + 'static,
105{
106 type Output = Formula<Q, Q, f32>;
107
108 fn mul(self, other: f32) -> Self::Output {
109 Formula::<Q, Q, f32>::Multiply(FormulaOperand::<Q>::Formula(Box::new(self)), other.into())
110 }
111}
112
113impl<Q> std::ops::Div<f32> for Formula<Q, Q, f32>
114where
115 Q: Quantity + 'static,
116{
117 type Output = Formula<Q, Q, f32>;
118
119 fn div(self, rhs: f32) -> Self::Output {
120 Formula::<Q, Q, f32>::Divide(FormulaOperand::<Q>::Formula(Box::new(self)), rhs.into())
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use crate::{client::test_utils::logging::capture_logs, quantity::Power};
128 use async_trait::async_trait;
129 use chrono::{DateTime, TimeDelta, Utc};
130
131 #[tokio::test]
132 async fn test_addition() {
133 let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)])
134 + formula(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)])
135 + formula(3, vec![Some(2.0), Some(3.0), Some(4.0), Some(5.0)]);
136 assert_eq!(composed.to_string(), "((#1 + #2) + #3)");
137 assert_eq!(
138 collect_values(&composed, Power::as_watts).await,
139 vec![Some(13.0), Some(17.0), None, Some(29.0)]
140 );
141
142 let composed = formula(4, vec![None, Some(100.0), Some(-10.0), Some(5.0)]) + composed;
143 assert_eq!(composed.to_string(), "(#4 + ((#1 + #2) + #3))");
144 assert_eq!(
145 collect_values(&composed, Power::as_watts).await,
146 vec![None, Some(117.0), None, Some(34.0)]
147 );
148
149 let composed = formula(5, vec![None, Some(4.0), Some(-10.0), Some(-10.0)]) + composed;
150 assert_eq!(composed.to_string(), "(#5 + (#4 + ((#1 + #2) + #3)))");
151 assert_eq!(
152 collect_values(&composed, Power::as_watts).await,
153 vec![None, Some(121.0), None, Some(24.0)]
154 );
155
156 let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)])
157 + Power::from_watts(5.0)
158 + formula(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]);
159 assert_eq!(composed.to_string(), "((#1 + 5 W) + #2)");
160 assert_eq!(
161 collect_values(&composed, Power::as_watts).await,
162 vec![Some(16.0), Some(19.0), None, Some(29.0)]
163 );
164 }
165
166 #[tokio::test]
167 async fn test_subtraction() {
168 let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)])
169 - formula(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)])
170 - formula(3, vec![Some(2.0), Some(3.0), Some(4.0), Some(5.0)]);
171 assert_eq!(composed.to_string(), "((#1 - #2) - #3)");
172 assert_eq!(
173 collect_values(&composed, Power::as_watts).await,
174 vec![Some(7.0), Some(7.0), None, Some(11.0)]
175 );
176
177 let composed = formula(4, vec![None, Some(100.0), Some(-10.0), Some(5.0)]) - composed;
178 assert_eq!(composed.to_string(), "(#4 - ((#1 - #2) - #3))");
179 assert_eq!(
180 collect_values(&composed, Power::as_watts).await,
181 vec![None, Some(93.0), None, Some(-6.0)]
182 );
183
184 let composed = formula(5, vec![None, Some(4.0), Some(-10.0), Some(-10.0)]) - composed;
185 assert_eq!(composed.to_string(), "(#5 - (#4 - ((#1 - #2) - #3)))");
186 assert_eq!(
187 collect_values(&composed, Power::as_watts).await,
188 vec![None, Some(-89.0), None, Some(-4.0)]
189 );
190
191 let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)])
192 - Power::from_watts(5.0)
193 - formula(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]);
194 assert_eq!(composed.to_string(), "((#1 - 5 W) - #2)");
195 assert_eq!(
196 collect_values(&composed, Power::as_watts).await,
197 vec![Some(4.0), Some(5.0), None, Some(11.0)]
198 );
199 }
200
201 #[tokio::test]
202 async fn test_multiplication() {
203 let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)]) * 2.0;
204 assert_eq!(composed.to_string(), "(#1 * 2)");
205 assert_eq!(
206 collect_values(&composed, Power::as_watts).await,
207 vec![Some(20.0), Some(24.0), None, Some(40.0)]
208 );
209 }
210
211 #[tokio::test]
212 async fn test_division() {
213 let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)]) / 2.0;
214 assert_eq!(composed.to_string(), "(#1 / 2)");
215 assert_eq!(
216 collect_values(&composed, Power::as_watts).await,
217 vec![Some(5.0), Some(6.0), None, Some(10.0)]
218 );
219
220 let composed = formula(1, vec![Some(10.0), Some(12.0), None, Some(20.0)]) / 0.0;
221 assert_eq!(composed.to_string(), "(#1 / 0)");
222 assert_eq!(
223 collect_values(&composed, Power::as_watts).await,
224 vec![None, None, None, None]
225 );
226 }
227
228 #[tokio::test]
229 async fn test_coalesce() {
230 let composed = formula(1, vec![None, Some(12.0), None, Some(20.0)])
231 .coalesce(formula(2, vec![Some(1.0), None, None, Some(4.0)]))
232 .unwrap()
233 .coalesce(formula(3, vec![Some(2.0), Some(3.0), Some(8.0), None]))
234 .unwrap();
235
236 assert_eq!(composed.to_string(), "COALESCE(#1, #2, #3)");
237 assert_eq!(
238 collect_values(&composed, Power::as_watts).await,
239 vec![Some(1.0), Some(12.0), Some(8.0), Some(20.0)]
240 );
241
242 let composed = formula(1, vec![None, Some(12.0), None, Some(20.0)])
243 .coalesce(Power::from_watts(5.0))
244 .unwrap();
245 assert_eq!(composed.to_string(), "COALESCE(#1, 5 W)");
246 assert_eq!(
247 collect_values(&composed, Power::as_watts).await,
248 vec![Some(5.0), Some(12.0), Some(5.0), Some(20.0)]
249 );
250 }
251
252 #[tokio::test]
253 async fn test_min() {
254 let composed = formula(1, vec![Some(10.0), Some(1.0), None, Some(20.0)])
255 .min(formula(
256 2,
257 vec![Some(5.0), Some(15.0), Some(3.0), Some(25.0)],
258 ))
259 .unwrap()
260 .min(formula(
261 3,
262 vec![Some(8.0), Some(11.0), Some(4.0), Some(18.0)],
263 ))
264 .unwrap();
265
266 assert_eq!(composed.to_string(), "MIN(#1, #2, #3)");
267 assert_eq!(
268 collect_values(&composed, Power::as_watts).await,
269 vec![Some(5.0), Some(1.0), None, Some(18.0)]
270 );
271
272 let composed = formula(1, vec![Some(10.0), Some(1.0), None, Some(20.0)])
273 .min(Power::from_watts(15.0))
274 .unwrap();
275 assert_eq!(composed.to_string(), "MIN(#1, 15 W)");
276 assert_eq!(
277 collect_values(&composed, Power::as_watts).await,
278 vec![Some(10.0), Some(1.0), None, Some(15.0)]
279 );
280 }
281
282 #[tokio::test]
283 async fn test_max() {
284 let composed = formula(1, vec![Some(10.0), Some(1.0), None, Some(20.0)])
285 .max(formula(
286 2,
287 vec![Some(5.0), Some(15.0), Some(3.0), Some(25.0)],
288 ))
289 .unwrap()
290 .max(formula(
291 3,
292 vec![Some(8.0), Some(21.0), Some(4.0), Some(18.0)],
293 ))
294 .unwrap();
295
296 assert_eq!(composed.to_string(), "MAX(#1, #2, #3)");
297 assert_eq!(
298 collect_values(&composed, Power::as_watts).await,
299 vec![Some(10.0), Some(21.0), None, Some(25.0)]
300 );
301
302 let composed = formula(1, vec![Some(10.0), Some(1.0), None, Some(20.0)])
303 .max(Power::from_watts(15.0))
304 .unwrap();
305 assert_eq!(composed.to_string(), "MAX(#1, 15 W)");
306 assert_eq!(
307 collect_values(&composed, Power::as_watts).await,
308 vec![Some(15.0), Some(15.0), None, Some(20.0)]
309 );
310 }
311
312 #[tokio::test]
313 async fn test_avg() {
314 let composed = formula(1, vec![Some(10.0), Some(20.0), None, Some(30.0), None])
315 .avg(vec![
316 formula(2, vec![Some(20.0), Some(30.0), Some(40.0), None, None]),
317 formula(3, vec![Some(30.0), Some(40.0), Some(50.0), None, None]),
318 ])
319 .unwrap();
320
321 assert_eq!(composed.to_string(), "AVG(#1, #2, #3)");
322 assert_eq!(
323 collect_values(&composed, Power::as_watts).await,
324 vec![Some(20.0), Some(30.0), Some(45.0), Some(30.0), None]
325 );
326 }
327
328 #[tokio::test]
329 async fn test_mixed_operations() {
330 let composed = (formula(1, vec![Some(10.0), Some(20.0), None, Some(30.0)])
331 + formula(2, vec![Some(5.0), Some(15.0), Some(25.0), Some(35.0)]))
332 .max(formula(
333 3,
334 vec![Some(12.0), Some(48.0), Some(22.0), Some(40.0)],
335 ))
336 .unwrap()
337 / 2.0;
338
339 assert_eq!(composed.to_string(), "(MAX((#1 + #2), #3) / 2)");
340 assert_eq!(
341 collect_values(&composed, Power::as_watts).await,
342 vec![Some(7.5), Some(24.0), None, Some(32.5)]
343 );
344
345 let composed = formula(1, vec![Some(100.0), Some(200.0), None, Some(300.0)])
346 .coalesce(formula(2, vec![Some(50.0), None, Some(25.0), Some(150.0)]))
347 .unwrap()
348 .min(formula(
349 3,
350 vec![Some(80.0), Some(180.0), Some(30.0), Some(250.0)],
351 ))
352 .unwrap();
353 assert_eq!(composed.to_string(), "MIN(COALESCE(#1, #2), #3)");
354 assert_eq!(
355 collect_values(&composed, Power::as_watts).await,
356 vec![Some(80.0), Some(180.0), Some(25.0), Some(250.0)]
357 );
358
359 let composed = formula(1, vec![Some(10.0), Some(20.0), None, Some(30.0)])
360 + formula(2, vec![Some(5.0), Some(15.0), Some(25.0), Some(35.0)])
361 - formula(3, vec![Some(3.0), Some(13.0), Some(23.0), Some(33.0)]) * 2.0;
362 assert_eq!(composed.to_string(), "((#1 + #2) - (#3 * 2))");
363 assert_eq!(
364 collect_values(&composed, Power::as_watts).await,
365 vec![Some(9.0), Some(9.0), None, Some(-1.0)]
366 );
367 }
368
369 #[tokio::test]
370 async fn test_out_of_sync_handling() {
371 let composed = formula(1, vec![Some(10.0), Some(20.0), Some(30.0), Some(40.0)])
372 + formula_out_of_sync(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]);
373
374 assert_eq!(composed.to_string(), "(#1 + #2)");
375 assert_eq!(
376 collect_values(&composed, Power::as_watts).await,
377 vec![Some(21.0), Some(32.0), Some(43.0)]
379 );
380 }
381
382 #[tokio::test]
383 async fn test_never_sync_handling() {
384 let composed = formula(1, vec![Some(10.0), Some(20.0), Some(30.0), Some(40.0)])
385 + formula_never_sync(2, vec![Some(1.0), Some(2.0), Some(3.0), Some(4.0)]);
386
387 assert_eq!(composed.to_string(), "(#1 + #2)");
388 let (collected_values, logs) =
389 capture_logs(|| collect_values(&composed, Power::as_watts)).await;
390 assert_eq!(
391 collected_values,
392 vec![]
394 );
395 assert_eq!(logs.len(), 1);
396 assert_eq!(
397 logs[0],
398 concat!(
399 "ERROR frequenz_microgrid::logical_meter::formula::async_formula: ",
400 "Couldn't synchronize receivers: ConnectionFailure: Could not receive sample: ",
401 "channel closed. Stopping processing."
402 )
403 );
404 }
405
406 fn formula(comp_id: u64, values: Vec<Option<f32>>) -> Formula<Power> {
407 Formula::<Power>::Subscriber(Box::new(MockFormulaSubscriber::new(
408 DateTime::<Utc>::UNIX_EPOCH,
409 format!("#{comp_id}"),
410 values,
411 )))
412 }
413
414 fn formula_out_of_sync(comp_id: u64, values: Vec<Option<f32>>) -> Formula<Power, Power, f32> {
415 Formula::<Power, Power, f32>::Subscriber(Box::new(MockFormulaSubscriber::new(
416 DateTime::<Utc>::UNIX_EPOCH + TimeDelta::milliseconds(200),
417 format!("#{comp_id}"),
418 values,
419 )))
420 }
421
422 fn formula_never_sync(comp_id: u64, values: Vec<Option<f32>>) -> Formula<Power, Power, f32> {
423 Formula::<Power, Power, f32>::Subscriber(Box::new(MockFormulaSubscriber::new(
424 DateTime::<Utc>::UNIX_EPOCH + TimeDelta::milliseconds(100),
425 format!("#{comp_id}"),
426 values,
427 )))
428 }
429
430 async fn collect_values<T, F>(formula: &Formula<T>, extractor: F) -> Vec<Option<f32>>
431 where
432 T: Quantity,
433 F: Fn(&T) -> f32,
434 {
435 let mut rx = formula.subscribe().await.unwrap();
436 let mut values = Vec::new();
437 while let Ok(sample) = rx.recv().await {
438 values.push(sample.value().map(|v| extractor(&v)));
439 }
440 values
441 }
442
443 #[derive(Clone)]
444 struct MockFormulaSubscriber {
445 start_time: chrono::DateTime<Utc>,
446 formula: String,
447 values: Vec<Option<f32>>,
448 }
449
450 impl MockFormulaSubscriber {
451 fn new(
452 start_time: chrono::DateTime<Utc>,
453 formula: String,
454 values: Vec<Option<f32>>,
455 ) -> Self {
456 Self {
457 start_time,
458 formula,
459 values,
460 }
461 }
462 }
463
464 impl std::fmt::Display for MockFormulaSubscriber {
465 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
466 write!(f, "{}", self.formula)
467 }
468 }
469
470 #[async_trait]
471 impl FormulaSubscriber for MockFormulaSubscriber {
472 type QuantityType = Power;
473
474 async fn subscribe(
475 &self,
476 ) -> Result<broadcast::Receiver<Sample<Self::QuantityType>>, Error> {
477 let (tx, rx) = broadcast::channel(10);
478 for (idx, &value) in self.values.iter().enumerate() {
479 let sample = Sample::new(
480 self.start_time + TimeDelta::new(0, (idx * 200_000_000) as u32).unwrap(),
481 value.map(Power::from_watts),
482 );
483 let _ = tx.send(sample);
484 }
485 Ok(rx)
486 }
487 }
488}