solverforge_core/constraints/
stream.rs

1use crate::constraints::{Collector, Joiner, NamedExpression, WasmFunction};
2use serde::{Deserialize, Serialize};
3
4/// A component in a constraint stream pipeline.
5///
6/// Constraint streams transform entity streams through filtering, joining,
7/// grouping, and finally scoring. Components chain together to form constraints.
8///
9/// # Pipeline Structure
10///
11/// A typical constraint pipeline:
12/// 1. **Source**: `for_each("Lesson")` - iterate over entities
13/// 2. **Filter**: `filter(predicate)` - exclude non-matching entities
14/// 3. **Join**: `join("Timeslot")` - combine with another entity type
15/// 4. **Score**: `penalize("1hard")` or `reward("1soft")` - apply scoring
16///
17/// # Example
18///
19/// ```
20/// use solverforge_core::constraints::StreamComponent;
21///
22/// // Simple penalty: penalize each lesson by 1 hard point
23/// let stream = vec![
24///     StreamComponent::for_each("Lesson"),
25///     StreamComponent::penalize("1hard"),
26/// ];
27///
28/// // Unique pair constraint: penalize conflicting lessons
29/// let conflict = vec![
30///     StreamComponent::for_each_unique_pair("Lesson"),
31///     StreamComponent::penalize("1hard"),
32/// ];
33/// ```
34///
35/// # Score Weights
36///
37/// Weights specify the penalty/reward magnitude:
38/// - `"1hard"` - 1 hard constraint violation
39/// - `"10soft"` - 10 soft constraint points
40/// - `"1hard/5soft"` - both hard and soft impact
41#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42#[serde(tag = "kind")]
43pub enum StreamComponent {
44    #[serde(rename = "forEach")]
45    ForEach {
46        #[serde(rename = "className")]
47        class_name: String,
48    },
49    #[serde(rename = "forEachIncludingUnassigned")]
50    ForEachIncludingUnassigned {
51        #[serde(rename = "className")]
52        class_name: String,
53    },
54    #[serde(rename = "forEachUniquePair")]
55    ForEachUniquePair {
56        #[serde(rename = "className")]
57        class_name: String,
58        #[serde(default, skip_serializing_if = "Vec::is_empty")]
59        joiners: Vec<Joiner>,
60    },
61    #[serde(rename = "filter")]
62    Filter { predicate: WasmFunction },
63    #[serde(rename = "join")]
64    Join {
65        #[serde(rename = "className")]
66        class_name: String,
67        #[serde(default, skip_serializing_if = "Vec::is_empty")]
68        joiners: Vec<Joiner>,
69    },
70    #[serde(rename = "ifExists")]
71    IfExists {
72        #[serde(rename = "className")]
73        class_name: String,
74        #[serde(default, skip_serializing_if = "Vec::is_empty")]
75        joiners: Vec<Joiner>,
76    },
77    #[serde(rename = "ifNotExists")]
78    IfNotExists {
79        #[serde(rename = "className")]
80        class_name: String,
81        #[serde(default, skip_serializing_if = "Vec::is_empty")]
82        joiners: Vec<Joiner>,
83    },
84    #[serde(rename = "ifExistsOther")]
85    IfExistsOther {
86        #[serde(rename = "className")]
87        class_name: String,
88        #[serde(default, skip_serializing_if = "Vec::is_empty")]
89        joiners: Vec<Joiner>,
90    },
91    #[serde(rename = "ifNotExistsOther")]
92    IfNotExistsOther {
93        #[serde(rename = "className")]
94        class_name: String,
95        #[serde(default, skip_serializing_if = "Vec::is_empty")]
96        joiners: Vec<Joiner>,
97    },
98    #[serde(rename = "ifExistsIncludingUnassigned")]
99    IfExistsIncludingUnassigned {
100        #[serde(rename = "className")]
101        class_name: String,
102        #[serde(default, skip_serializing_if = "Vec::is_empty")]
103        joiners: Vec<Joiner>,
104    },
105    #[serde(rename = "ifNotExistsIncludingUnassigned")]
106    IfNotExistsIncludingUnassigned {
107        #[serde(rename = "className")]
108        class_name: String,
109        #[serde(default, skip_serializing_if = "Vec::is_empty")]
110        joiners: Vec<Joiner>,
111    },
112    #[serde(rename = "groupBy")]
113    GroupBy {
114        #[serde(default, skip_serializing_if = "Vec::is_empty")]
115        keys: Vec<WasmFunction>,
116        #[serde(default, skip_serializing_if = "Vec::is_empty")]
117        aggregators: Vec<Collector>,
118    },
119    #[serde(rename = "map")]
120    Map {
121        #[serde(rename = "mapper")]
122        mappers: Vec<WasmFunction>,
123    },
124    #[serde(rename = "flattenLast")]
125    FlattenLast {
126        #[serde(skip_serializing_if = "Option::is_none")]
127        map: Option<WasmFunction>,
128    },
129    #[serde(rename = "expand")]
130    Expand {
131        #[serde(rename = "mapper")]
132        mappers: Vec<WasmFunction>,
133    },
134    #[serde(rename = "complement")]
135    Complement {
136        #[serde(rename = "className")]
137        class_name: String,
138    },
139    #[serde(rename = "penalize")]
140    Penalize {
141        weight: String,
142        #[serde(rename = "scaleBy", skip_serializing_if = "Option::is_none")]
143        scale_by: Option<WasmFunction>,
144    },
145    #[serde(rename = "reward")]
146    Reward {
147        weight: String,
148        #[serde(rename = "scaleBy", skip_serializing_if = "Option::is_none")]
149        scale_by: Option<WasmFunction>,
150    },
151    #[serde(rename = "concat")]
152    Concat {
153        #[serde(rename = "otherComponents")]
154        other_components: Vec<StreamComponent>,
155    },
156    #[serde(rename = "distinct")]
157    Distinct,
158    #[serde(rename = "impact")]
159    Impact {
160        weight: String,
161        #[serde(rename = "scaleBy", skip_serializing_if = "Option::is_none")]
162        scale_by: Option<WasmFunction>,
163    },
164    #[serde(rename = "indictWith")]
165    IndictWith {
166        #[serde(rename = "indictedObjectProvider")]
167        indicted_object_provider: WasmFunction,
168    },
169    #[serde(rename = "justifyWith")]
170    JustifyWith {
171        #[serde(rename = "justificationSupplier")]
172        justification_supplier: WasmFunction,
173    },
174}
175
176impl StreamComponent {
177    /// Iterates over all entities of the given class.
178    ///
179    /// This is the most common way to start a constraint stream.
180    /// Only considers entities with assigned planning variables.
181    ///
182    /// ```
183    /// use solverforge_core::constraints::StreamComponent;
184    ///
185    /// let source = StreamComponent::for_each("Lesson");
186    /// ```
187    pub fn for_each(class_name: impl Into<String>) -> Self {
188        StreamComponent::ForEach {
189            class_name: class_name.into(),
190        }
191    }
192
193    pub fn for_each_including_unassigned(class_name: impl Into<String>) -> Self {
194        StreamComponent::ForEachIncludingUnassigned {
195            class_name: class_name.into(),
196        }
197    }
198
199    /// Iterates over all unique pairs of entities of the given class.
200    ///
201    /// Use this for constraints that compare two entities of the same type,
202    /// like detecting room conflicts between lessons.
203    ///
204    /// ```
205    /// use solverforge_core::constraints::StreamComponent;
206    ///
207    /// // Pairs: (A,B), (A,C), (B,C) - no duplicates like (B,A)
208    /// let pairs = StreamComponent::for_each_unique_pair("Lesson");
209    /// ```
210    pub fn for_each_unique_pair(class_name: impl Into<String>) -> Self {
211        StreamComponent::ForEachUniquePair {
212            class_name: class_name.into(),
213            joiners: Vec::new(),
214        }
215    }
216
217    pub fn for_each_unique_pair_with_joiners(
218        class_name: impl Into<String>,
219        joiners: Vec<Joiner>,
220    ) -> Self {
221        StreamComponent::ForEachUniquePair {
222            class_name: class_name.into(),
223            joiners,
224        }
225    }
226
227    pub fn filter(predicate: WasmFunction) -> Self {
228        StreamComponent::Filter { predicate }
229    }
230
231    pub fn join(class_name: impl Into<String>) -> Self {
232        StreamComponent::Join {
233            class_name: class_name.into(),
234            joiners: Vec::new(),
235        }
236    }
237
238    pub fn join_with_joiners(class_name: impl Into<String>, joiners: Vec<Joiner>) -> Self {
239        StreamComponent::Join {
240            class_name: class_name.into(),
241            joiners,
242        }
243    }
244
245    pub fn if_exists(class_name: impl Into<String>) -> Self {
246        StreamComponent::IfExists {
247            class_name: class_name.into(),
248            joiners: Vec::new(),
249        }
250    }
251
252    pub fn if_exists_with_joiners(class_name: impl Into<String>, joiners: Vec<Joiner>) -> Self {
253        StreamComponent::IfExists {
254            class_name: class_name.into(),
255            joiners,
256        }
257    }
258
259    pub fn if_not_exists(class_name: impl Into<String>) -> Self {
260        StreamComponent::IfNotExists {
261            class_name: class_name.into(),
262            joiners: Vec::new(),
263        }
264    }
265
266    pub fn if_not_exists_with_joiners(class_name: impl Into<String>, joiners: Vec<Joiner>) -> Self {
267        StreamComponent::IfNotExists {
268            class_name: class_name.into(),
269            joiners,
270        }
271    }
272
273    pub fn if_exists_other(class_name: impl Into<String>) -> Self {
274        StreamComponent::IfExistsOther {
275            class_name: class_name.into(),
276            joiners: Vec::new(),
277        }
278    }
279
280    pub fn if_exists_other_with_joiners(
281        class_name: impl Into<String>,
282        joiners: Vec<Joiner>,
283    ) -> Self {
284        StreamComponent::IfExistsOther {
285            class_name: class_name.into(),
286            joiners,
287        }
288    }
289
290    pub fn if_not_exists_other(class_name: impl Into<String>) -> Self {
291        StreamComponent::IfNotExistsOther {
292            class_name: class_name.into(),
293            joiners: Vec::new(),
294        }
295    }
296
297    pub fn if_not_exists_other_with_joiners(
298        class_name: impl Into<String>,
299        joiners: Vec<Joiner>,
300    ) -> Self {
301        StreamComponent::IfNotExistsOther {
302            class_name: class_name.into(),
303            joiners,
304        }
305    }
306
307    pub fn if_exists_including_unassigned(class_name: impl Into<String>) -> Self {
308        StreamComponent::IfExistsIncludingUnassigned {
309            class_name: class_name.into(),
310            joiners: Vec::new(),
311        }
312    }
313
314    pub fn if_exists_including_unassigned_with_joiners(
315        class_name: impl Into<String>,
316        joiners: Vec<Joiner>,
317    ) -> Self {
318        StreamComponent::IfExistsIncludingUnassigned {
319            class_name: class_name.into(),
320            joiners,
321        }
322    }
323
324    pub fn if_not_exists_including_unassigned(class_name: impl Into<String>) -> Self {
325        StreamComponent::IfNotExistsIncludingUnassigned {
326            class_name: class_name.into(),
327            joiners: Vec::new(),
328        }
329    }
330
331    pub fn if_not_exists_including_unassigned_with_joiners(
332        class_name: impl Into<String>,
333        joiners: Vec<Joiner>,
334    ) -> Self {
335        StreamComponent::IfNotExistsIncludingUnassigned {
336            class_name: class_name.into(),
337            joiners,
338        }
339    }
340
341    pub fn group_by(keys: Vec<WasmFunction>, aggregators: Vec<Collector>) -> Self {
342        StreamComponent::GroupBy { keys, aggregators }
343    }
344
345    pub fn group_by_key(key: WasmFunction) -> Self {
346        StreamComponent::GroupBy {
347            keys: vec![key],
348            aggregators: Vec::new(),
349        }
350    }
351
352    pub fn group_by_collector(aggregator: Collector) -> Self {
353        StreamComponent::GroupBy {
354            keys: Vec::new(),
355            aggregators: vec![aggregator],
356        }
357    }
358
359    pub fn map(mappers: Vec<WasmFunction>) -> Self {
360        StreamComponent::Map { mappers }
361    }
362
363    pub fn map_single(mapper: WasmFunction) -> Self {
364        StreamComponent::Map {
365            mappers: vec![mapper],
366        }
367    }
368
369    pub fn flatten_last() -> Self {
370        StreamComponent::FlattenLast { map: None }
371    }
372
373    pub fn flatten_last_with_map(map: WasmFunction) -> Self {
374        StreamComponent::FlattenLast { map: Some(map) }
375    }
376
377    pub fn expand(mappers: Vec<WasmFunction>) -> Self {
378        StreamComponent::Expand { mappers }
379    }
380
381    pub fn complement(class_name: impl Into<String>) -> Self {
382        StreamComponent::Complement {
383            class_name: class_name.into(),
384        }
385    }
386
387    /// Penalizes matching entities by a fixed weight.
388    ///
389    /// The weight reduces the solution score. Higher penalties are worse.
390    ///
391    /// ```
392    /// use solverforge_core::constraints::StreamComponent;
393    ///
394    /// // Hard constraint: 1 point per violation
395    /// let hard = StreamComponent::penalize("1hard");
396    ///
397    /// // Soft constraint: 100 points per violation
398    /// let soft = StreamComponent::penalize("100soft");
399    /// ```
400    pub fn penalize(weight: impl Into<String>) -> Self {
401        StreamComponent::Penalize {
402            weight: weight.into(),
403            scale_by: None,
404        }
405    }
406
407    pub fn penalize_with_weigher(weight: impl Into<String>, scale_by: WasmFunction) -> Self {
408        StreamComponent::Penalize {
409            weight: weight.into(),
410            scale_by: Some(scale_by),
411        }
412    }
413
414    /// Rewards matching entities by a fixed weight.
415    ///
416    /// The weight increases the solution score. Higher rewards are better.
417    ///
418    /// ```
419    /// use solverforge_core::constraints::StreamComponent;
420    ///
421    /// // Reward preferred assignments
422    /// let bonus = StreamComponent::reward("10soft");
423    /// ```
424    pub fn reward(weight: impl Into<String>) -> Self {
425        StreamComponent::Reward {
426            weight: weight.into(),
427            scale_by: None,
428        }
429    }
430
431    pub fn reward_with_weigher(weight: impl Into<String>, scale_by: WasmFunction) -> Self {
432        StreamComponent::Reward {
433            weight: weight.into(),
434            scale_by: Some(scale_by),
435        }
436    }
437
438    pub fn concat(other_components: Vec<StreamComponent>) -> Self {
439        StreamComponent::Concat { other_components }
440    }
441
442    pub fn distinct() -> Self {
443        StreamComponent::Distinct
444    }
445
446    pub fn impact(weight: impl Into<String>) -> Self {
447        StreamComponent::Impact {
448            weight: weight.into(),
449            scale_by: None,
450        }
451    }
452
453    pub fn impact_with_weigher(weight: impl Into<String>, scale_by: WasmFunction) -> Self {
454        StreamComponent::Impact {
455            weight: weight.into(),
456            scale_by: Some(scale_by),
457        }
458    }
459
460    pub fn indict_with(indicted_object_provider: WasmFunction) -> Self {
461        StreamComponent::IndictWith {
462            indicted_object_provider,
463        }
464    }
465
466    pub fn indict_with_expr(indicted_object_provider: NamedExpression) -> Self {
467        StreamComponent::IndictWith {
468            indicted_object_provider: indicted_object_provider.into(),
469        }
470    }
471
472    pub fn justify_with(justification_supplier: WasmFunction) -> Self {
473        StreamComponent::JustifyWith {
474            justification_supplier,
475        }
476    }
477
478    pub fn justify_with_expr(justification_supplier: NamedExpression) -> Self {
479        StreamComponent::JustifyWith {
480            justification_supplier: justification_supplier.into(),
481        }
482    }
483
484    // ===== Expression-based convenience methods =====
485
486    /// Creates a filter component from a named expression.
487    ///
488    /// # Example
489    ///
490    /// ```
491    /// use solverforge_core::wasm::{Expr, FieldAccessExt};
492    /// use solverforge_core::constraints::{StreamComponent, IntoNamedExpression};
493    ///
494    /// let has_room = Expr::is_not_null(Expr::param(0).get("Lesson", "room"))
495    ///     .named_as("has_room");
496    /// let filter = StreamComponent::filter_expr(has_room);
497    /// assert!(matches!(filter, StreamComponent::Filter { .. }));
498    /// ```
499    pub fn filter_expr(expr: NamedExpression) -> Self {
500        StreamComponent::Filter {
501            predicate: expr.into(),
502        }
503    }
504
505    /// Creates a map component from named expressions.
506    pub fn map_expr(mappers: Vec<NamedExpression>) -> Self {
507        StreamComponent::Map {
508            mappers: mappers.into_iter().map(|e| e.into()).collect(),
509        }
510    }
511
512    /// Creates a single-mapper map component from a named expression.
513    pub fn map_single_expr(mapper: NamedExpression) -> Self {
514        StreamComponent::Map {
515            mappers: vec![mapper.into()],
516        }
517    }
518
519    /// Creates a groupBy component with expression-based key extractors.
520    pub fn group_by_expr(keys: Vec<NamedExpression>, aggregators: Vec<Collector>) -> Self {
521        StreamComponent::GroupBy {
522            keys: keys.into_iter().map(|e| e.into()).collect(),
523            aggregators,
524        }
525    }
526
527    /// Creates a groupBy component with a single expression-based key.
528    pub fn group_by_key_expr(key: NamedExpression) -> Self {
529        StreamComponent::GroupBy {
530            keys: vec![key.into()],
531            aggregators: Vec::new(),
532        }
533    }
534
535    /// Creates a penalize component with an expression-based weigher.
536    pub fn penalize_with_expr(weight: impl Into<String>, scale_by: NamedExpression) -> Self {
537        StreamComponent::Penalize {
538            weight: weight.into(),
539            scale_by: Some(scale_by.into()),
540        }
541    }
542
543    /// Creates a reward component with an expression-based weigher.
544    pub fn reward_with_expr(weight: impl Into<String>, scale_by: NamedExpression) -> Self {
545        StreamComponent::Reward {
546            weight: weight.into(),
547            scale_by: Some(scale_by.into()),
548        }
549    }
550
551    /// Creates a flattenLast component with an expression-based mapper.
552    pub fn flatten_last_with_expr(map: NamedExpression) -> Self {
553        StreamComponent::FlattenLast {
554            map: Some(map.into()),
555        }
556    }
557
558    /// Creates an expand component from named expressions.
559    pub fn expand_expr(mappers: Vec<NamedExpression>) -> Self {
560        StreamComponent::Expand {
561            mappers: mappers.into_iter().map(|e| e.into()).collect(),
562        }
563    }
564
565    /// Creates an impact component with an expression-based weigher.
566    pub fn impact_with_expr(weight: impl Into<String>, scale_by: NamedExpression) -> Self {
567        StreamComponent::Impact {
568            weight: weight.into(),
569            scale_by: Some(scale_by.into()),
570        }
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577
578    #[test]
579    fn test_for_each() {
580        let component = StreamComponent::for_each("Lesson");
581        match component {
582            StreamComponent::ForEach { class_name } => {
583                assert_eq!(class_name, "Lesson");
584            }
585            _ => panic!("Expected ForEach"),
586        }
587    }
588
589    #[test]
590    fn test_for_each_including_unassigned() {
591        let component = StreamComponent::for_each_including_unassigned("Lesson");
592        match component {
593            StreamComponent::ForEachIncludingUnassigned { class_name } => {
594                assert_eq!(class_name, "Lesson");
595            }
596            _ => panic!("Expected ForEachIncludingUnassigned"),
597        }
598    }
599
600    #[test]
601    fn test_for_each_unique_pair() {
602        let component = StreamComponent::for_each_unique_pair("Lesson");
603        match component {
604            StreamComponent::ForEachUniquePair {
605                class_name,
606                joiners,
607            } => {
608                assert_eq!(class_name, "Lesson");
609                assert!(joiners.is_empty());
610            }
611            _ => panic!("Expected ForEachUniquePair"),
612        }
613    }
614
615    #[test]
616    fn test_for_each_unique_pair_with_joiners() {
617        let component = StreamComponent::for_each_unique_pair_with_joiners(
618            "Lesson",
619            vec![Joiner::equal(WasmFunction::new("get_timeslot"))],
620        );
621        match component {
622            StreamComponent::ForEachUniquePair { joiners, .. } => {
623                assert_eq!(joiners.len(), 1);
624            }
625            _ => panic!("Expected ForEachUniquePair"),
626        }
627    }
628
629    #[test]
630    fn test_filter() {
631        let component = StreamComponent::filter(WasmFunction::new("is_valid"));
632        match component {
633            StreamComponent::Filter { predicate } => {
634                assert_eq!(predicate.name(), "is_valid");
635            }
636            _ => panic!("Expected Filter"),
637        }
638    }
639
640    #[test]
641    fn test_join() {
642        let component = StreamComponent::join("Room");
643        match component {
644            StreamComponent::Join {
645                class_name,
646                joiners,
647            } => {
648                assert_eq!(class_name, "Room");
649                assert!(joiners.is_empty());
650            }
651            _ => panic!("Expected Join"),
652        }
653    }
654
655    #[test]
656    fn test_join_with_joiners() {
657        let component = StreamComponent::join_with_joiners(
658            "Room",
659            vec![Joiner::equal(WasmFunction::new("get_room"))],
660        );
661        match component {
662            StreamComponent::Join { joiners, .. } => {
663                assert_eq!(joiners.len(), 1);
664            }
665            _ => panic!("Expected Join"),
666        }
667    }
668
669    #[test]
670    fn test_if_exists() {
671        let component = StreamComponent::if_exists("Conflict");
672        match component {
673            StreamComponent::IfExists { class_name, .. } => {
674                assert_eq!(class_name, "Conflict");
675            }
676            _ => panic!("Expected IfExists"),
677        }
678    }
679
680    #[test]
681    fn test_if_not_exists() {
682        let component = StreamComponent::if_not_exists("Conflict");
683        match component {
684            StreamComponent::IfNotExists { class_name, .. } => {
685                assert_eq!(class_name, "Conflict");
686            }
687            _ => panic!("Expected IfNotExists"),
688        }
689    }
690
691    #[test]
692    fn test_group_by() {
693        let component = StreamComponent::group_by(
694            vec![WasmFunction::new("get_room")],
695            vec![Collector::count()],
696        );
697        match component {
698            StreamComponent::GroupBy { keys, aggregators } => {
699                assert_eq!(keys.len(), 1);
700                assert_eq!(aggregators.len(), 1);
701            }
702            _ => panic!("Expected GroupBy"),
703        }
704    }
705
706    #[test]
707    fn test_group_by_key() {
708        let component = StreamComponent::group_by_key(WasmFunction::new("get_room"));
709        match component {
710            StreamComponent::GroupBy { keys, aggregators } => {
711                assert_eq!(keys.len(), 1);
712                assert!(aggregators.is_empty());
713            }
714            _ => panic!("Expected GroupBy"),
715        }
716    }
717
718    #[test]
719    fn test_group_by_collector() {
720        let component = StreamComponent::group_by_collector(Collector::count());
721        match component {
722            StreamComponent::GroupBy { keys, aggregators } => {
723                assert!(keys.is_empty());
724                assert_eq!(aggregators.len(), 1);
725            }
726            _ => panic!("Expected GroupBy"),
727        }
728    }
729
730    #[test]
731    fn test_map() {
732        let component =
733            StreamComponent::map(vec![WasmFunction::new("get_a"), WasmFunction::new("get_b")]);
734        match component {
735            StreamComponent::Map { mappers } => {
736                assert_eq!(mappers.len(), 2);
737            }
738            _ => panic!("Expected Map"),
739        }
740    }
741
742    #[test]
743    fn test_map_single() {
744        let component = StreamComponent::map_single(WasmFunction::new("get_value"));
745        match component {
746            StreamComponent::Map { mappers } => {
747                assert_eq!(mappers.len(), 1);
748            }
749            _ => panic!("Expected Map"),
750        }
751    }
752
753    #[test]
754    fn test_flatten_last() {
755        let component = StreamComponent::flatten_last();
756        match component {
757            StreamComponent::FlattenLast { map } => {
758                assert!(map.is_none());
759            }
760            _ => panic!("Expected FlattenLast"),
761        }
762    }
763
764    #[test]
765    fn test_flatten_last_with_map() {
766        let component = StreamComponent::flatten_last_with_map(WasmFunction::new("get_items"));
767        match component {
768            StreamComponent::FlattenLast { map } => {
769                assert!(map.is_some());
770            }
771            _ => panic!("Expected FlattenLast"),
772        }
773    }
774
775    #[test]
776    fn test_expand() {
777        let component = StreamComponent::expand(vec![WasmFunction::new("get_extra")]);
778        match component {
779            StreamComponent::Expand { mappers } => {
780                assert_eq!(mappers.len(), 1);
781            }
782            _ => panic!("Expected Expand"),
783        }
784    }
785
786    #[test]
787    fn test_complement() {
788        let component = StreamComponent::complement("Timeslot");
789        match component {
790            StreamComponent::Complement { class_name } => {
791                assert_eq!(class_name, "Timeslot");
792            }
793            _ => panic!("Expected Complement"),
794        }
795    }
796
797    #[test]
798    fn test_penalize() {
799        let component = StreamComponent::penalize("1hard");
800        match component {
801            StreamComponent::Penalize { weight, scale_by } => {
802                assert_eq!(weight, "1hard");
803                assert!(scale_by.is_none());
804            }
805            _ => panic!("Expected Penalize"),
806        }
807    }
808
809    #[test]
810    fn test_penalize_with_weigher() {
811        let component =
812            StreamComponent::penalize_with_weigher("1hard", WasmFunction::new("get_weight"));
813        match component {
814            StreamComponent::Penalize { weight, scale_by } => {
815                assert_eq!(weight, "1hard");
816                assert!(scale_by.is_some());
817            }
818            _ => panic!("Expected Penalize"),
819        }
820    }
821
822    #[test]
823    fn test_reward() {
824        let component = StreamComponent::reward("1soft");
825        match component {
826            StreamComponent::Reward { weight, scale_by } => {
827                assert_eq!(weight, "1soft");
828                assert!(scale_by.is_none());
829            }
830            _ => panic!("Expected Reward"),
831        }
832    }
833
834    #[test]
835    fn test_reward_with_weigher() {
836        let component =
837            StreamComponent::reward_with_weigher("1soft", WasmFunction::new("get_bonus"));
838        match component {
839            StreamComponent::Reward { scale_by, .. } => {
840                assert!(scale_by.is_some());
841            }
842            _ => panic!("Expected Reward"),
843        }
844    }
845
846    #[test]
847    fn test_for_each_json_serialization() {
848        let component = StreamComponent::for_each("Lesson");
849        let json = serde_json::to_string(&component).unwrap();
850        assert!(json.contains("\"kind\":\"forEach\""));
851        assert!(json.contains("\"className\":\"Lesson\""));
852
853        let parsed: StreamComponent = serde_json::from_str(&json).unwrap();
854        assert_eq!(parsed, component);
855    }
856
857    #[test]
858    fn test_filter_json_serialization() {
859        let component = StreamComponent::filter(WasmFunction::new("is_valid"));
860        let json = serde_json::to_string(&component).unwrap();
861        assert!(json.contains("\"kind\":\"filter\""));
862        assert!(json.contains("\"predicate\":\"is_valid\""));
863
864        let parsed: StreamComponent = serde_json::from_str(&json).unwrap();
865        assert_eq!(parsed, component);
866    }
867
868    #[test]
869    fn test_join_json_serialization() {
870        let component = StreamComponent::join_with_joiners(
871            "Room",
872            vec![Joiner::equal(WasmFunction::new("get_room"))],
873        );
874        let json = serde_json::to_string(&component).unwrap();
875        assert!(json.contains("\"kind\":\"join\""));
876        assert!(json.contains("\"className\":\"Room\""));
877        assert!(json.contains("\"joiners\""));
878
879        let parsed: StreamComponent = serde_json::from_str(&json).unwrap();
880        assert_eq!(parsed, component);
881    }
882
883    #[test]
884    fn test_group_by_json_serialization() {
885        let component = StreamComponent::group_by(
886            vec![WasmFunction::new("get_room")],
887            vec![Collector::count()],
888        );
889        let json = serde_json::to_string(&component).unwrap();
890        assert!(json.contains("\"kind\":\"groupBy\""));
891        assert!(json.contains("\"keys\""));
892        assert!(json.contains("\"aggregators\""));
893
894        let parsed: StreamComponent = serde_json::from_str(&json).unwrap();
895        assert_eq!(parsed, component);
896    }
897
898    #[test]
899    fn test_penalize_json_serialization() {
900        let component = StreamComponent::penalize("1hard");
901        let json = serde_json::to_string(&component).unwrap();
902        assert!(json.contains("\"kind\":\"penalize\""));
903        assert!(json.contains("\"weight\":\"1hard\""));
904
905        let parsed: StreamComponent = serde_json::from_str(&json).unwrap();
906        assert_eq!(parsed, component);
907    }
908
909    #[test]
910    fn test_component_clone() {
911        let component = StreamComponent::for_each("Lesson");
912        let cloned = component.clone();
913        assert_eq!(component, cloned);
914    }
915
916    #[test]
917    fn test_component_debug() {
918        let component = StreamComponent::for_each("Lesson");
919        let debug = format!("{:?}", component);
920        assert!(debug.contains("ForEach"));
921    }
922
923    // ===== Expression-based method tests =====
924
925    #[test]
926    fn test_filter_expr() {
927        use crate::constraints::IntoNamedExpression;
928        use crate::wasm::{Expr, FieldAccessExt};
929
930        let has_room = Expr::is_not_null(Expr::param(0).get("Lesson", "room")).named_as("has_room");
931        let component = StreamComponent::filter_expr(has_room);
932
933        match component {
934            StreamComponent::Filter { predicate } => {
935                assert_eq!(predicate.name(), "has_room");
936            }
937            _ => panic!("Expected Filter"),
938        }
939    }
940
941    #[test]
942    fn test_map_expr() {
943        use crate::constraints::IntoNamedExpression;
944        use crate::wasm::{Expr, FieldAccessExt};
945
946        let get_room = Expr::param(0).get("Lesson", "room").named_as("get_room");
947        let get_timeslot = Expr::param(0)
948            .get("Lesson", "timeslot")
949            .named_as("get_timeslot");
950        let component = StreamComponent::map_expr(vec![get_room, get_timeslot]);
951
952        match component {
953            StreamComponent::Map { mappers } => {
954                assert_eq!(mappers.len(), 2);
955                assert_eq!(mappers[0].name(), "get_room");
956                assert_eq!(mappers[1].name(), "get_timeslot");
957            }
958            _ => panic!("Expected Map"),
959        }
960    }
961
962    #[test]
963    fn test_map_single_expr() {
964        use crate::constraints::IntoNamedExpression;
965        use crate::wasm::{Expr, FieldAccessExt};
966
967        let get_room = Expr::param(0).get("Lesson", "room").named_as("get_room");
968        let component = StreamComponent::map_single_expr(get_room);
969
970        match component {
971            StreamComponent::Map { mappers } => {
972                assert_eq!(mappers.len(), 1);
973                assert_eq!(mappers[0].name(), "get_room");
974            }
975            _ => panic!("Expected Map"),
976        }
977    }
978
979    #[test]
980    fn test_group_by_expr() {
981        use crate::constraints::IntoNamedExpression;
982        use crate::wasm::{Expr, FieldAccessExt};
983
984        let get_room = Expr::param(0).get("Lesson", "room").named_as("get_room");
985        let component = StreamComponent::group_by_expr(vec![get_room], vec![Collector::count()]);
986
987        match component {
988            StreamComponent::GroupBy { keys, aggregators } => {
989                assert_eq!(keys.len(), 1);
990                assert_eq!(keys[0].name(), "get_room");
991                assert_eq!(aggregators.len(), 1);
992            }
993            _ => panic!("Expected GroupBy"),
994        }
995    }
996
997    #[test]
998    fn test_group_by_key_expr() {
999        use crate::constraints::IntoNamedExpression;
1000        use crate::wasm::{Expr, FieldAccessExt};
1001
1002        let get_room = Expr::param(0).get("Lesson", "room").named_as("get_room");
1003        let component = StreamComponent::group_by_key_expr(get_room);
1004
1005        match component {
1006            StreamComponent::GroupBy { keys, aggregators } => {
1007                assert_eq!(keys.len(), 1);
1008                assert!(aggregators.is_empty());
1009            }
1010            _ => panic!("Expected GroupBy"),
1011        }
1012    }
1013
1014    #[test]
1015    fn test_penalize_with_expr() {
1016        use crate::constraints::IntoNamedExpression;
1017        use crate::wasm::Expr;
1018
1019        let weight_fn = Expr::int(1).named_as("weight");
1020        let component = StreamComponent::penalize_with_expr("1hard", weight_fn);
1021
1022        match component {
1023            StreamComponent::Penalize { weight, scale_by } => {
1024                assert_eq!(weight, "1hard");
1025                assert!(scale_by.is_some());
1026                assert_eq!(scale_by.unwrap().name(), "weight");
1027            }
1028            _ => panic!("Expected Penalize"),
1029        }
1030    }
1031
1032    #[test]
1033    fn test_reward_with_expr() {
1034        use crate::constraints::IntoNamedExpression;
1035        use crate::wasm::Expr;
1036
1037        let bonus = Expr::int(10).named_as("bonus");
1038        let component = StreamComponent::reward_with_expr("1soft", bonus);
1039
1040        match component {
1041            StreamComponent::Reward { weight, scale_by } => {
1042                assert_eq!(weight, "1soft");
1043                assert!(scale_by.is_some());
1044            }
1045            _ => panic!("Expected Reward"),
1046        }
1047    }
1048}