1use crate::constraints::{Collector, Joiner, NamedExpression, WasmFunction};
2use serde::{Deserialize, Serialize};
3
4#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(tag = "kind")]
43pub enum StreamComponent {
44 #[serde(rename = "forEach")]
45 ForEach {
46 #[serde(rename = "className")]
47 class_name: String,
48 },
49 #[serde(rename = "forEachIncludingUnassigned")]
50 ForEachIncludingUnassigned {
51 #[serde(rename = "className")]
52 class_name: String,
53 },
54 #[serde(rename = "forEachUniquePair")]
55 ForEachUniquePair {
56 #[serde(rename = "className")]
57 class_name: String,
58 #[serde(default, skip_serializing_if = "Vec::is_empty")]
59 joiners: Vec<Joiner>,
60 },
61 #[serde(rename = "filter")]
62 Filter { predicate: WasmFunction },
63 #[serde(rename = "join")]
64 Join {
65 #[serde(rename = "className")]
66 class_name: String,
67 #[serde(default, skip_serializing_if = "Vec::is_empty")]
68 joiners: Vec<Joiner>,
69 },
70 #[serde(rename = "ifExists")]
71 IfExists {
72 #[serde(rename = "className")]
73 class_name: String,
74 #[serde(default, skip_serializing_if = "Vec::is_empty")]
75 joiners: Vec<Joiner>,
76 },
77 #[serde(rename = "ifNotExists")]
78 IfNotExists {
79 #[serde(rename = "className")]
80 class_name: String,
81 #[serde(default, skip_serializing_if = "Vec::is_empty")]
82 joiners: Vec<Joiner>,
83 },
84 #[serde(rename = "ifExistsOther")]
85 IfExistsOther {
86 #[serde(rename = "className")]
87 class_name: String,
88 #[serde(default, skip_serializing_if = "Vec::is_empty")]
89 joiners: Vec<Joiner>,
90 },
91 #[serde(rename = "ifNotExistsOther")]
92 IfNotExistsOther {
93 #[serde(rename = "className")]
94 class_name: String,
95 #[serde(default, skip_serializing_if = "Vec::is_empty")]
96 joiners: Vec<Joiner>,
97 },
98 #[serde(rename = "ifExistsIncludingUnassigned")]
99 IfExistsIncludingUnassigned {
100 #[serde(rename = "className")]
101 class_name: String,
102 #[serde(default, skip_serializing_if = "Vec::is_empty")]
103 joiners: Vec<Joiner>,
104 },
105 #[serde(rename = "ifNotExistsIncludingUnassigned")]
106 IfNotExistsIncludingUnassigned {
107 #[serde(rename = "className")]
108 class_name: String,
109 #[serde(default, skip_serializing_if = "Vec::is_empty")]
110 joiners: Vec<Joiner>,
111 },
112 #[serde(rename = "groupBy")]
113 GroupBy {
114 #[serde(default, skip_serializing_if = "Vec::is_empty")]
115 keys: Vec<WasmFunction>,
116 #[serde(default, skip_serializing_if = "Vec::is_empty")]
117 aggregators: Vec<Collector>,
118 },
119 #[serde(rename = "map")]
120 Map {
121 #[serde(rename = "mapper")]
122 mappers: Vec<WasmFunction>,
123 },
124 #[serde(rename = "flattenLast")]
125 FlattenLast {
126 #[serde(skip_serializing_if = "Option::is_none")]
127 map: Option<WasmFunction>,
128 },
129 #[serde(rename = "expand")]
130 Expand {
131 #[serde(rename = "mapper")]
132 mappers: Vec<WasmFunction>,
133 },
134 #[serde(rename = "complement")]
135 Complement {
136 #[serde(rename = "className")]
137 class_name: String,
138 },
139 #[serde(rename = "penalize")]
140 Penalize {
141 weight: String,
142 #[serde(rename = "scaleBy", skip_serializing_if = "Option::is_none")]
143 scale_by: Option<WasmFunction>,
144 },
145 #[serde(rename = "reward")]
146 Reward {
147 weight: String,
148 #[serde(rename = "scaleBy", skip_serializing_if = "Option::is_none")]
149 scale_by: Option<WasmFunction>,
150 },
151 #[serde(rename = "concat")]
152 Concat {
153 #[serde(rename = "otherComponents")]
154 other_components: Vec<StreamComponent>,
155 },
156 #[serde(rename = "distinct")]
157 Distinct,
158 #[serde(rename = "impact")]
159 Impact {
160 weight: String,
161 #[serde(rename = "scaleBy", skip_serializing_if = "Option::is_none")]
162 scale_by: Option<WasmFunction>,
163 },
164 #[serde(rename = "indictWith")]
165 IndictWith {
166 #[serde(rename = "indictedObjectProvider")]
167 indicted_object_provider: WasmFunction,
168 },
169 #[serde(rename = "justifyWith")]
170 JustifyWith {
171 #[serde(rename = "justificationSupplier")]
172 justification_supplier: WasmFunction,
173 },
174}
175
176impl StreamComponent {
177 pub fn for_each(class_name: impl Into<String>) -> Self {
188 StreamComponent::ForEach {
189 class_name: class_name.into(),
190 }
191 }
192
193 pub fn for_each_including_unassigned(class_name: impl Into<String>) -> Self {
194 StreamComponent::ForEachIncludingUnassigned {
195 class_name: class_name.into(),
196 }
197 }
198
199 pub fn for_each_unique_pair(class_name: impl Into<String>) -> Self {
211 StreamComponent::ForEachUniquePair {
212 class_name: class_name.into(),
213 joiners: Vec::new(),
214 }
215 }
216
217 pub fn for_each_unique_pair_with_joiners(
218 class_name: impl Into<String>,
219 joiners: Vec<Joiner>,
220 ) -> Self {
221 StreamComponent::ForEachUniquePair {
222 class_name: class_name.into(),
223 joiners,
224 }
225 }
226
227 pub fn filter(predicate: WasmFunction) -> Self {
228 StreamComponent::Filter { predicate }
229 }
230
231 pub fn join(class_name: impl Into<String>) -> Self {
232 StreamComponent::Join {
233 class_name: class_name.into(),
234 joiners: Vec::new(),
235 }
236 }
237
238 pub fn join_with_joiners(class_name: impl Into<String>, joiners: Vec<Joiner>) -> Self {
239 StreamComponent::Join {
240 class_name: class_name.into(),
241 joiners,
242 }
243 }
244
245 pub fn if_exists(class_name: impl Into<String>) -> Self {
246 StreamComponent::IfExists {
247 class_name: class_name.into(),
248 joiners: Vec::new(),
249 }
250 }
251
252 pub fn if_exists_with_joiners(class_name: impl Into<String>, joiners: Vec<Joiner>) -> Self {
253 StreamComponent::IfExists {
254 class_name: class_name.into(),
255 joiners,
256 }
257 }
258
259 pub fn if_not_exists(class_name: impl Into<String>) -> Self {
260 StreamComponent::IfNotExists {
261 class_name: class_name.into(),
262 joiners: Vec::new(),
263 }
264 }
265
266 pub fn if_not_exists_with_joiners(class_name: impl Into<String>, joiners: Vec<Joiner>) -> Self {
267 StreamComponent::IfNotExists {
268 class_name: class_name.into(),
269 joiners,
270 }
271 }
272
273 pub fn if_exists_other(class_name: impl Into<String>) -> Self {
274 StreamComponent::IfExistsOther {
275 class_name: class_name.into(),
276 joiners: Vec::new(),
277 }
278 }
279
280 pub fn if_exists_other_with_joiners(
281 class_name: impl Into<String>,
282 joiners: Vec<Joiner>,
283 ) -> Self {
284 StreamComponent::IfExistsOther {
285 class_name: class_name.into(),
286 joiners,
287 }
288 }
289
290 pub fn if_not_exists_other(class_name: impl Into<String>) -> Self {
291 StreamComponent::IfNotExistsOther {
292 class_name: class_name.into(),
293 joiners: Vec::new(),
294 }
295 }
296
297 pub fn if_not_exists_other_with_joiners(
298 class_name: impl Into<String>,
299 joiners: Vec<Joiner>,
300 ) -> Self {
301 StreamComponent::IfNotExistsOther {
302 class_name: class_name.into(),
303 joiners,
304 }
305 }
306
307 pub fn if_exists_including_unassigned(class_name: impl Into<String>) -> Self {
308 StreamComponent::IfExistsIncludingUnassigned {
309 class_name: class_name.into(),
310 joiners: Vec::new(),
311 }
312 }
313
314 pub fn if_exists_including_unassigned_with_joiners(
315 class_name: impl Into<String>,
316 joiners: Vec<Joiner>,
317 ) -> Self {
318 StreamComponent::IfExistsIncludingUnassigned {
319 class_name: class_name.into(),
320 joiners,
321 }
322 }
323
324 pub fn if_not_exists_including_unassigned(class_name: impl Into<String>) -> Self {
325 StreamComponent::IfNotExistsIncludingUnassigned {
326 class_name: class_name.into(),
327 joiners: Vec::new(),
328 }
329 }
330
331 pub fn if_not_exists_including_unassigned_with_joiners(
332 class_name: impl Into<String>,
333 joiners: Vec<Joiner>,
334 ) -> Self {
335 StreamComponent::IfNotExistsIncludingUnassigned {
336 class_name: class_name.into(),
337 joiners,
338 }
339 }
340
341 pub fn group_by(keys: Vec<WasmFunction>, aggregators: Vec<Collector>) -> Self {
342 StreamComponent::GroupBy { keys, aggregators }
343 }
344
345 pub fn group_by_key(key: WasmFunction) -> Self {
346 StreamComponent::GroupBy {
347 keys: vec![key],
348 aggregators: Vec::new(),
349 }
350 }
351
352 pub fn group_by_collector(aggregator: Collector) -> Self {
353 StreamComponent::GroupBy {
354 keys: Vec::new(),
355 aggregators: vec![aggregator],
356 }
357 }
358
359 pub fn map(mappers: Vec<WasmFunction>) -> Self {
360 StreamComponent::Map { mappers }
361 }
362
363 pub fn map_single(mapper: WasmFunction) -> Self {
364 StreamComponent::Map {
365 mappers: vec![mapper],
366 }
367 }
368
369 pub fn flatten_last() -> Self {
370 StreamComponent::FlattenLast { map: None }
371 }
372
373 pub fn flatten_last_with_map(map: WasmFunction) -> Self {
374 StreamComponent::FlattenLast { map: Some(map) }
375 }
376
377 pub fn expand(mappers: Vec<WasmFunction>) -> Self {
378 StreamComponent::Expand { mappers }
379 }
380
381 pub fn complement(class_name: impl Into<String>) -> Self {
382 StreamComponent::Complement {
383 class_name: class_name.into(),
384 }
385 }
386
387 pub fn penalize(weight: impl Into<String>) -> Self {
401 StreamComponent::Penalize {
402 weight: weight.into(),
403 scale_by: None,
404 }
405 }
406
407 pub fn penalize_with_weigher(weight: impl Into<String>, scale_by: WasmFunction) -> Self {
408 StreamComponent::Penalize {
409 weight: weight.into(),
410 scale_by: Some(scale_by),
411 }
412 }
413
414 pub fn reward(weight: impl Into<String>) -> Self {
425 StreamComponent::Reward {
426 weight: weight.into(),
427 scale_by: None,
428 }
429 }
430
431 pub fn reward_with_weigher(weight: impl Into<String>, scale_by: WasmFunction) -> Self {
432 StreamComponent::Reward {
433 weight: weight.into(),
434 scale_by: Some(scale_by),
435 }
436 }
437
438 pub fn concat(other_components: Vec<StreamComponent>) -> Self {
439 StreamComponent::Concat { other_components }
440 }
441
442 pub fn distinct() -> Self {
443 StreamComponent::Distinct
444 }
445
446 pub fn impact(weight: impl Into<String>) -> Self {
447 StreamComponent::Impact {
448 weight: weight.into(),
449 scale_by: None,
450 }
451 }
452
453 pub fn impact_with_weigher(weight: impl Into<String>, scale_by: WasmFunction) -> Self {
454 StreamComponent::Impact {
455 weight: weight.into(),
456 scale_by: Some(scale_by),
457 }
458 }
459
460 pub fn indict_with(indicted_object_provider: WasmFunction) -> Self {
461 StreamComponent::IndictWith {
462 indicted_object_provider,
463 }
464 }
465
466 pub fn indict_with_expr(indicted_object_provider: NamedExpression) -> Self {
467 StreamComponent::IndictWith {
468 indicted_object_provider: indicted_object_provider.into(),
469 }
470 }
471
472 pub fn justify_with(justification_supplier: WasmFunction) -> Self {
473 StreamComponent::JustifyWith {
474 justification_supplier,
475 }
476 }
477
478 pub fn justify_with_expr(justification_supplier: NamedExpression) -> Self {
479 StreamComponent::JustifyWith {
480 justification_supplier: justification_supplier.into(),
481 }
482 }
483
484 pub fn filter_expr(expr: NamedExpression) -> Self {
500 StreamComponent::Filter {
501 predicate: expr.into(),
502 }
503 }
504
505 pub fn map_expr(mappers: Vec<NamedExpression>) -> Self {
507 StreamComponent::Map {
508 mappers: mappers.into_iter().map(|e| e.into()).collect(),
509 }
510 }
511
512 pub fn map_single_expr(mapper: NamedExpression) -> Self {
514 StreamComponent::Map {
515 mappers: vec![mapper.into()],
516 }
517 }
518
519 pub fn group_by_expr(keys: Vec<NamedExpression>, aggregators: Vec<Collector>) -> Self {
521 StreamComponent::GroupBy {
522 keys: keys.into_iter().map(|e| e.into()).collect(),
523 aggregators,
524 }
525 }
526
527 pub fn group_by_key_expr(key: NamedExpression) -> Self {
529 StreamComponent::GroupBy {
530 keys: vec![key.into()],
531 aggregators: Vec::new(),
532 }
533 }
534
535 pub fn penalize_with_expr(weight: impl Into<String>, scale_by: NamedExpression) -> Self {
537 StreamComponent::Penalize {
538 weight: weight.into(),
539 scale_by: Some(scale_by.into()),
540 }
541 }
542
543 pub fn reward_with_expr(weight: impl Into<String>, scale_by: NamedExpression) -> Self {
545 StreamComponent::Reward {
546 weight: weight.into(),
547 scale_by: Some(scale_by.into()),
548 }
549 }
550
551 pub fn flatten_last_with_expr(map: NamedExpression) -> Self {
553 StreamComponent::FlattenLast {
554 map: Some(map.into()),
555 }
556 }
557
558 pub fn expand_expr(mappers: Vec<NamedExpression>) -> Self {
560 StreamComponent::Expand {
561 mappers: mappers.into_iter().map(|e| e.into()).collect(),
562 }
563 }
564
565 pub fn impact_with_expr(weight: impl Into<String>, scale_by: NamedExpression) -> Self {
567 StreamComponent::Impact {
568 weight: weight.into(),
569 scale_by: Some(scale_by.into()),
570 }
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577
578 #[test]
579 fn test_for_each() {
580 let component = StreamComponent::for_each("Lesson");
581 match component {
582 StreamComponent::ForEach { class_name } => {
583 assert_eq!(class_name, "Lesson");
584 }
585 _ => panic!("Expected ForEach"),
586 }
587 }
588
589 #[test]
590 fn test_for_each_including_unassigned() {
591 let component = StreamComponent::for_each_including_unassigned("Lesson");
592 match component {
593 StreamComponent::ForEachIncludingUnassigned { class_name } => {
594 assert_eq!(class_name, "Lesson");
595 }
596 _ => panic!("Expected ForEachIncludingUnassigned"),
597 }
598 }
599
600 #[test]
601 fn test_for_each_unique_pair() {
602 let component = StreamComponent::for_each_unique_pair("Lesson");
603 match component {
604 StreamComponent::ForEachUniquePair {
605 class_name,
606 joiners,
607 } => {
608 assert_eq!(class_name, "Lesson");
609 assert!(joiners.is_empty());
610 }
611 _ => panic!("Expected ForEachUniquePair"),
612 }
613 }
614
615 #[test]
616 fn test_for_each_unique_pair_with_joiners() {
617 let component = StreamComponent::for_each_unique_pair_with_joiners(
618 "Lesson",
619 vec![Joiner::equal(WasmFunction::new("get_timeslot"))],
620 );
621 match component {
622 StreamComponent::ForEachUniquePair { joiners, .. } => {
623 assert_eq!(joiners.len(), 1);
624 }
625 _ => panic!("Expected ForEachUniquePair"),
626 }
627 }
628
629 #[test]
630 fn test_filter() {
631 let component = StreamComponent::filter(WasmFunction::new("is_valid"));
632 match component {
633 StreamComponent::Filter { predicate } => {
634 assert_eq!(predicate.name(), "is_valid");
635 }
636 _ => panic!("Expected Filter"),
637 }
638 }
639
640 #[test]
641 fn test_join() {
642 let component = StreamComponent::join("Room");
643 match component {
644 StreamComponent::Join {
645 class_name,
646 joiners,
647 } => {
648 assert_eq!(class_name, "Room");
649 assert!(joiners.is_empty());
650 }
651 _ => panic!("Expected Join"),
652 }
653 }
654
655 #[test]
656 fn test_join_with_joiners() {
657 let component = StreamComponent::join_with_joiners(
658 "Room",
659 vec![Joiner::equal(WasmFunction::new("get_room"))],
660 );
661 match component {
662 StreamComponent::Join { joiners, .. } => {
663 assert_eq!(joiners.len(), 1);
664 }
665 _ => panic!("Expected Join"),
666 }
667 }
668
669 #[test]
670 fn test_if_exists() {
671 let component = StreamComponent::if_exists("Conflict");
672 match component {
673 StreamComponent::IfExists { class_name, .. } => {
674 assert_eq!(class_name, "Conflict");
675 }
676 _ => panic!("Expected IfExists"),
677 }
678 }
679
680 #[test]
681 fn test_if_not_exists() {
682 let component = StreamComponent::if_not_exists("Conflict");
683 match component {
684 StreamComponent::IfNotExists { class_name, .. } => {
685 assert_eq!(class_name, "Conflict");
686 }
687 _ => panic!("Expected IfNotExists"),
688 }
689 }
690
691 #[test]
692 fn test_group_by() {
693 let component = StreamComponent::group_by(
694 vec![WasmFunction::new("get_room")],
695 vec![Collector::count()],
696 );
697 match component {
698 StreamComponent::GroupBy { keys, aggregators } => {
699 assert_eq!(keys.len(), 1);
700 assert_eq!(aggregators.len(), 1);
701 }
702 _ => panic!("Expected GroupBy"),
703 }
704 }
705
706 #[test]
707 fn test_group_by_key() {
708 let component = StreamComponent::group_by_key(WasmFunction::new("get_room"));
709 match component {
710 StreamComponent::GroupBy { keys, aggregators } => {
711 assert_eq!(keys.len(), 1);
712 assert!(aggregators.is_empty());
713 }
714 _ => panic!("Expected GroupBy"),
715 }
716 }
717
718 #[test]
719 fn test_group_by_collector() {
720 let component = StreamComponent::group_by_collector(Collector::count());
721 match component {
722 StreamComponent::GroupBy { keys, aggregators } => {
723 assert!(keys.is_empty());
724 assert_eq!(aggregators.len(), 1);
725 }
726 _ => panic!("Expected GroupBy"),
727 }
728 }
729
730 #[test]
731 fn test_map() {
732 let component =
733 StreamComponent::map(vec![WasmFunction::new("get_a"), WasmFunction::new("get_b")]);
734 match component {
735 StreamComponent::Map { mappers } => {
736 assert_eq!(mappers.len(), 2);
737 }
738 _ => panic!("Expected Map"),
739 }
740 }
741
742 #[test]
743 fn test_map_single() {
744 let component = StreamComponent::map_single(WasmFunction::new("get_value"));
745 match component {
746 StreamComponent::Map { mappers } => {
747 assert_eq!(mappers.len(), 1);
748 }
749 _ => panic!("Expected Map"),
750 }
751 }
752
753 #[test]
754 fn test_flatten_last() {
755 let component = StreamComponent::flatten_last();
756 match component {
757 StreamComponent::FlattenLast { map } => {
758 assert!(map.is_none());
759 }
760 _ => panic!("Expected FlattenLast"),
761 }
762 }
763
764 #[test]
765 fn test_flatten_last_with_map() {
766 let component = StreamComponent::flatten_last_with_map(WasmFunction::new("get_items"));
767 match component {
768 StreamComponent::FlattenLast { map } => {
769 assert!(map.is_some());
770 }
771 _ => panic!("Expected FlattenLast"),
772 }
773 }
774
775 #[test]
776 fn test_expand() {
777 let component = StreamComponent::expand(vec![WasmFunction::new("get_extra")]);
778 match component {
779 StreamComponent::Expand { mappers } => {
780 assert_eq!(mappers.len(), 1);
781 }
782 _ => panic!("Expected Expand"),
783 }
784 }
785
786 #[test]
787 fn test_complement() {
788 let component = StreamComponent::complement("Timeslot");
789 match component {
790 StreamComponent::Complement { class_name } => {
791 assert_eq!(class_name, "Timeslot");
792 }
793 _ => panic!("Expected Complement"),
794 }
795 }
796
797 #[test]
798 fn test_penalize() {
799 let component = StreamComponent::penalize("1hard");
800 match component {
801 StreamComponent::Penalize { weight, scale_by } => {
802 assert_eq!(weight, "1hard");
803 assert!(scale_by.is_none());
804 }
805 _ => panic!("Expected Penalize"),
806 }
807 }
808
809 #[test]
810 fn test_penalize_with_weigher() {
811 let component =
812 StreamComponent::penalize_with_weigher("1hard", WasmFunction::new("get_weight"));
813 match component {
814 StreamComponent::Penalize { weight, scale_by } => {
815 assert_eq!(weight, "1hard");
816 assert!(scale_by.is_some());
817 }
818 _ => panic!("Expected Penalize"),
819 }
820 }
821
822 #[test]
823 fn test_reward() {
824 let component = StreamComponent::reward("1soft");
825 match component {
826 StreamComponent::Reward { weight, scale_by } => {
827 assert_eq!(weight, "1soft");
828 assert!(scale_by.is_none());
829 }
830 _ => panic!("Expected Reward"),
831 }
832 }
833
834 #[test]
835 fn test_reward_with_weigher() {
836 let component =
837 StreamComponent::reward_with_weigher("1soft", WasmFunction::new("get_bonus"));
838 match component {
839 StreamComponent::Reward { scale_by, .. } => {
840 assert!(scale_by.is_some());
841 }
842 _ => panic!("Expected Reward"),
843 }
844 }
845
846 #[test]
847 fn test_for_each_json_serialization() {
848 let component = StreamComponent::for_each("Lesson");
849 let json = serde_json::to_string(&component).unwrap();
850 assert!(json.contains("\"kind\":\"forEach\""));
851 assert!(json.contains("\"className\":\"Lesson\""));
852
853 let parsed: StreamComponent = serde_json::from_str(&json).unwrap();
854 assert_eq!(parsed, component);
855 }
856
857 #[test]
858 fn test_filter_json_serialization() {
859 let component = StreamComponent::filter(WasmFunction::new("is_valid"));
860 let json = serde_json::to_string(&component).unwrap();
861 assert!(json.contains("\"kind\":\"filter\""));
862 assert!(json.contains("\"predicate\":\"is_valid\""));
863
864 let parsed: StreamComponent = serde_json::from_str(&json).unwrap();
865 assert_eq!(parsed, component);
866 }
867
868 #[test]
869 fn test_join_json_serialization() {
870 let component = StreamComponent::join_with_joiners(
871 "Room",
872 vec![Joiner::equal(WasmFunction::new("get_room"))],
873 );
874 let json = serde_json::to_string(&component).unwrap();
875 assert!(json.contains("\"kind\":\"join\""));
876 assert!(json.contains("\"className\":\"Room\""));
877 assert!(json.contains("\"joiners\""));
878
879 let parsed: StreamComponent = serde_json::from_str(&json).unwrap();
880 assert_eq!(parsed, component);
881 }
882
883 #[test]
884 fn test_group_by_json_serialization() {
885 let component = StreamComponent::group_by(
886 vec![WasmFunction::new("get_room")],
887 vec![Collector::count()],
888 );
889 let json = serde_json::to_string(&component).unwrap();
890 assert!(json.contains("\"kind\":\"groupBy\""));
891 assert!(json.contains("\"keys\""));
892 assert!(json.contains("\"aggregators\""));
893
894 let parsed: StreamComponent = serde_json::from_str(&json).unwrap();
895 assert_eq!(parsed, component);
896 }
897
898 #[test]
899 fn test_penalize_json_serialization() {
900 let component = StreamComponent::penalize("1hard");
901 let json = serde_json::to_string(&component).unwrap();
902 assert!(json.contains("\"kind\":\"penalize\""));
903 assert!(json.contains("\"weight\":\"1hard\""));
904
905 let parsed: StreamComponent = serde_json::from_str(&json).unwrap();
906 assert_eq!(parsed, component);
907 }
908
909 #[test]
910 fn test_component_clone() {
911 let component = StreamComponent::for_each("Lesson");
912 let cloned = component.clone();
913 assert_eq!(component, cloned);
914 }
915
916 #[test]
917 fn test_component_debug() {
918 let component = StreamComponent::for_each("Lesson");
919 let debug = format!("{:?}", component);
920 assert!(debug.contains("ForEach"));
921 }
922
923 #[test]
926 fn test_filter_expr() {
927 use crate::constraints::IntoNamedExpression;
928 use crate::wasm::{Expr, FieldAccessExt};
929
930 let has_room = Expr::is_not_null(Expr::param(0).get("Lesson", "room")).named_as("has_room");
931 let component = StreamComponent::filter_expr(has_room);
932
933 match component {
934 StreamComponent::Filter { predicate } => {
935 assert_eq!(predicate.name(), "has_room");
936 }
937 _ => panic!("Expected Filter"),
938 }
939 }
940
941 #[test]
942 fn test_map_expr() {
943 use crate::constraints::IntoNamedExpression;
944 use crate::wasm::{Expr, FieldAccessExt};
945
946 let get_room = Expr::param(0).get("Lesson", "room").named_as("get_room");
947 let get_timeslot = Expr::param(0)
948 .get("Lesson", "timeslot")
949 .named_as("get_timeslot");
950 let component = StreamComponent::map_expr(vec![get_room, get_timeslot]);
951
952 match component {
953 StreamComponent::Map { mappers } => {
954 assert_eq!(mappers.len(), 2);
955 assert_eq!(mappers[0].name(), "get_room");
956 assert_eq!(mappers[1].name(), "get_timeslot");
957 }
958 _ => panic!("Expected Map"),
959 }
960 }
961
962 #[test]
963 fn test_map_single_expr() {
964 use crate::constraints::IntoNamedExpression;
965 use crate::wasm::{Expr, FieldAccessExt};
966
967 let get_room = Expr::param(0).get("Lesson", "room").named_as("get_room");
968 let component = StreamComponent::map_single_expr(get_room);
969
970 match component {
971 StreamComponent::Map { mappers } => {
972 assert_eq!(mappers.len(), 1);
973 assert_eq!(mappers[0].name(), "get_room");
974 }
975 _ => panic!("Expected Map"),
976 }
977 }
978
979 #[test]
980 fn test_group_by_expr() {
981 use crate::constraints::IntoNamedExpression;
982 use crate::wasm::{Expr, FieldAccessExt};
983
984 let get_room = Expr::param(0).get("Lesson", "room").named_as("get_room");
985 let component = StreamComponent::group_by_expr(vec![get_room], vec![Collector::count()]);
986
987 match component {
988 StreamComponent::GroupBy { keys, aggregators } => {
989 assert_eq!(keys.len(), 1);
990 assert_eq!(keys[0].name(), "get_room");
991 assert_eq!(aggregators.len(), 1);
992 }
993 _ => panic!("Expected GroupBy"),
994 }
995 }
996
997 #[test]
998 fn test_group_by_key_expr() {
999 use crate::constraints::IntoNamedExpression;
1000 use crate::wasm::{Expr, FieldAccessExt};
1001
1002 let get_room = Expr::param(0).get("Lesson", "room").named_as("get_room");
1003 let component = StreamComponent::group_by_key_expr(get_room);
1004
1005 match component {
1006 StreamComponent::GroupBy { keys, aggregators } => {
1007 assert_eq!(keys.len(), 1);
1008 assert!(aggregators.is_empty());
1009 }
1010 _ => panic!("Expected GroupBy"),
1011 }
1012 }
1013
1014 #[test]
1015 fn test_penalize_with_expr() {
1016 use crate::constraints::IntoNamedExpression;
1017 use crate::wasm::Expr;
1018
1019 let weight_fn = Expr::int(1).named_as("weight");
1020 let component = StreamComponent::penalize_with_expr("1hard", weight_fn);
1021
1022 match component {
1023 StreamComponent::Penalize { weight, scale_by } => {
1024 assert_eq!(weight, "1hard");
1025 assert!(scale_by.is_some());
1026 assert_eq!(scale_by.unwrap().name(), "weight");
1027 }
1028 _ => panic!("Expected Penalize"),
1029 }
1030 }
1031
1032 #[test]
1033 fn test_reward_with_expr() {
1034 use crate::constraints::IntoNamedExpression;
1035 use crate::wasm::Expr;
1036
1037 let bonus = Expr::int(10).named_as("bonus");
1038 let component = StreamComponent::reward_with_expr("1soft", bonus);
1039
1040 match component {
1041 StreamComponent::Reward { weight, scale_by } => {
1042 assert_eq!(weight, "1soft");
1043 assert!(scale_by.is_some());
1044 }
1045 _ => panic!("Expected Reward"),
1046 }
1047 }
1048}