1use std::collections::HashMap;
5
6use reifydb_core::{
7 CommitVersion,
8 interface::{FlowId, SourceId},
9};
10
11use crate::{
12 flow::{FlowChange, FlowDiff},
13 worker::{UnitOfWork, UnitsOfWork},
14};
15
16impl crate::engine::FlowEngine {
17 pub fn create_partition(
30 &self,
31 changes_by_version: HashMap<CommitVersion, Vec<(SourceId, Vec<FlowDiff>)>>,
32 ) -> UnitsOfWork {
33 let mut all_units_by_flow: HashMap<FlowId, Vec<UnitOfWork>> = HashMap::new();
34
35 let mut versions: Vec<_> = changes_by_version.keys().copied().collect();
37 versions.sort();
38
39 for version in versions {
40 let changes = changes_by_version.get(&version).unwrap();
41
42 let mut changes_by_source: HashMap<SourceId, Vec<FlowDiff>> = HashMap::new();
44 for (source_id, diffs) in changes {
45 changes_by_source.entry(*source_id).or_insert_with(Vec::new).extend(diffs.clone());
46 }
47
48 let version_units = self.partition_into_units_of_work(changes_by_source, version);
50
51 for flow_units in version_units.into_inner() {
54 for unit in flow_units {
55 all_units_by_flow.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
56 }
57 }
58 }
59
60 let units_vec: Vec<Vec<UnitOfWork>> = all_units_by_flow.into_iter().map(|(_, units)| units).collect();
62
63 {
66 use std::collections::HashSet;
67 let mut seen_flows = HashSet::new();
68
69 for flow_units in &units_vec {
70 assert!(!flow_units.is_empty(), "INVARIANT VIOLATED: Empty flow units in UnitsOfWork");
71
72 let flow_id = flow_units[0].flow_id;
73 assert!(
74 !seen_flows.contains(&flow_id),
75 "INVARIANT VIOLATED: flow_id {:?} appears multiple times in UnitsOfWork. \
76 This means the same flow will be processed by multiple parallel tasks, \
77 causing keyspace overlap.",
78 flow_id
79 );
80
81 for unit in flow_units {
83 assert_eq!(
84 unit.flow_id, flow_id,
85 "INVARIANT VIOLATED: Mixed flow_ids in same Vec - expected {:?}, got {:?}. \
86 All units in a Vec must belong to the same flow.",
87 flow_id, unit.flow_id
88 );
89 }
90
91 seen_flows.insert(flow_id);
92 }
93 }
94
95 UnitsOfWork::new(units_vec)
96 }
97
98 fn partition_into_units_of_work(
99 &self,
100 changes_by_source: HashMap<SourceId, Vec<FlowDiff>>,
101 version: CommitVersion,
102 ) -> UnitsOfWork {
103 let mut flow_changes: HashMap<FlowId, Vec<FlowChange>> = HashMap::new();
105
106 let sources = self.inner.sources.read();
108
109 for (source_id, diffs) in changes_by_source {
111 if let Some(subscriptions) = sources.get(&source_id) {
113 for (flow_id, node_id) in subscriptions {
114 let change = FlowChange::internal(*node_id, version, diffs.clone());
118 flow_changes.entry(*flow_id).or_insert_with(Vec::new).push(change);
119 }
120 }
121 }
122
123 drop(sources);
124
125 let units: Vec<Vec<UnitOfWork>> = flow_changes
128 .into_iter()
129 .map(|(flow_id, source_changes)| vec![UnitOfWork::new(flow_id, version, source_changes)])
130 .collect();
131
132 UnitsOfWork::new(units)
133 }
134}
135
136#[cfg(test)]
137mod tests {
138 use std::{
139 collections::{BTreeMap, HashMap},
140 sync::Arc,
141 };
142
143 use parking_lot::RwLock;
144 use rand::{rng, seq::SliceRandom};
145 use reifydb_core::{
146 CommitVersion, Row,
147 event::EventBus,
148 interface::{FlowId, FlowNodeId, SourceId, TableId},
149 util::CowVec,
150 value::encoded::{EncodedValues, EncodedValuesNamedLayout},
151 };
152 use reifydb_engine::{StandardRowEvaluator, execute::Executor};
153 use reifydb_rql::flow::FlowGraphAnalyzer;
154 use reifydb_type::{RowNumber, Type};
155
156 use crate::{
157 engine::{FlowEngine, FlowEngineInner},
158 flow::FlowDiff,
159 operator::transform::registry::TransformOperatorRegistry,
160 worker::{UnitOfWork, UnitsOfWork},
161 };
162
163 fn mk_sources(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> HashMap<SourceId, Vec<(FlowId, FlowNodeId)>> {
168 let mut sources_map = HashMap::new();
169 for (source_id, flows) in subscriptions {
170 let source_num = match source_id {
171 SourceId::Table(tid) => tid.0,
172 _ => panic!("Only Table sources supported in tests"),
173 };
174 let subscriptions_with_nodes: Vec<(FlowId, FlowNodeId)> = flows
175 .into_iter()
176 .map(|flow_id| {
177 let node_id = FlowNodeId(source_num * 1000 + flow_id.0);
179 (flow_id, node_id)
180 })
181 .collect();
182 sources_map.insert(source_id, subscriptions_with_nodes);
183 }
184 sources_map
185 }
186
187 fn setup_test_engine(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> FlowEngine {
188 let evaluator = StandardRowEvaluator::default();
189 let executor = Executor::testing();
190 let registry = TransformOperatorRegistry::new();
191
192 let sources = mk_sources(subscriptions);
193
194 let inner = FlowEngineInner {
195 evaluator,
196 executor,
197 registry,
198 operators: RwLock::new(HashMap::new()),
199 flows: RwLock::new(HashMap::new()),
200 sources: RwLock::new(sources),
201 sinks: RwLock::new(HashMap::new()),
202 analyzer: RwLock::new(FlowGraphAnalyzer::new()),
203 event_bus: EventBus::new(),
204 };
205
206 FlowEngine {
207 inner: Arc::new(inner),
208 }
209 }
210
211 fn mk_diff(label: &str) -> FlowDiff {
213 let row = mk_row(label);
214 FlowDiff::Insert {
215 post: row,
216 }
217 }
218
219 fn mk_row(label: &str) -> Row {
221 Row {
222 number: RowNumber(label.len() as u64),
223 encoded: EncodedValues(CowVec::new(label.as_bytes().to_vec())),
224 layout: EncodedValuesNamedLayout::new(std::iter::once(("test".to_string(), Type::Uint8))),
225 }
226 }
227
228 fn normalize(units: UnitsOfWork) -> BTreeMap<FlowId, Vec<UnitOfWork>> {
230 let mut map = BTreeMap::new();
231 for flow_units in units.into_inner() {
232 for unit in flow_units {
233 map.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
234 }
235 }
236 for vec in map.values_mut() {
238 vec.sort_by_key(|u| u.version);
239 }
240 map
241 }
242
243 fn snapshot_unit(unit: &UnitOfWork) -> (CommitVersion, BTreeMap<SourceId, usize>) {
247 let mut sources = BTreeMap::new();
248
249 for change in &unit.source_changes {
250 match change.origin {
251 crate::flow::FlowChangeOrigin::External(source_id) => {
252 let count = change.diffs.len();
253 *sources.entry(source_id).or_insert(0) += count;
254 }
255 crate::flow::FlowChangeOrigin::Internal(node_id) => {
256 let source_num = node_id.0 / 1000;
259 let source_id = SourceId::Table(reifydb_core::interface::TableId(source_num));
260 let count = change.diffs.len();
261 *sources.entry(source_id).or_insert(0) += count;
262 }
263 }
264 }
265 (unit.version, sources)
266 }
267
268 fn s(id: u64) -> SourceId {
270 SourceId::Table(TableId(id))
271 }
272
273 fn f(id: u64) -> FlowId {
275 FlowId(id)
276 }
277
278 fn v(ver: u64) -> CommitVersion {
280 CommitVersion(ver)
281 }
282
283 fn assert_normalized_eq(
285 result: &BTreeMap<FlowId, Vec<UnitOfWork>>,
286 expected: &BTreeMap<FlowId, Vec<UnitOfWork>>,
287 ) {
288 assert_eq!(result.len(), expected.len(), "Different number of flows");
289
290 for (flow_id, result_units) in result {
291 let expected_units =
292 expected.get(flow_id).expect(&format!("Flow {:?} missing in expected", flow_id));
293 assert_eq!(
294 result_units.len(),
295 expected_units.len(),
296 "Flow {:?} has different unit count",
297 flow_id
298 );
299
300 for (i, (result_unit, expected_unit)) in
301 result_units.iter().zip(expected_units.iter()).enumerate()
302 {
303 let result_snapshot = snapshot_unit(result_unit);
304 let expected_snapshot = snapshot_unit(expected_unit);
305 assert_eq!(
306 result_snapshot, expected_snapshot,
307 "Flow {:?} unit {} differs: {:?} vs {:?}",
308 flow_id, i, result_snapshot, expected_snapshot
309 );
310 }
311 }
312 }
313
314 #[test]
315 fn test_empty_input() {
316 let engine = setup_test_engine(HashMap::new());
317 let input = HashMap::new();
318
319 let result = engine.create_partition(input);
320
321 assert!(result.is_empty(), "Empty input should produce empty output");
322 }
323
324 #[test]
325 fn test_single_version_single_source_single_flow() {
326 let mut subscriptions = HashMap::new();
328 subscriptions.insert(s(1), vec![f(1)]);
329 let engine = setup_test_engine(subscriptions);
330
331 let mut input = HashMap::new();
333 input.insert(v(10), vec![(s(1), vec![mk_diff("d1"), mk_diff("d2")])]);
334
335 let result = engine.create_partition(input);
336 let normalized = normalize(result);
337
338 assert_eq!(normalized.len(), 1);
340 let f1_units = &normalized[&f(1)];
341 assert_eq!(f1_units.len(), 1);
342
343 let (ver, sources) = snapshot_unit(&f1_units[0]);
344 assert_eq!(ver, v(10));
345 assert_eq!(sources.get(&s(1)), Some(&2));
346 }
347
348 #[test]
349 fn test_single_version_multi_flow_fanout() {
350 let mut subscriptions = HashMap::new();
352 subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
353 let engine = setup_test_engine(subscriptions);
354
355 let mut input = HashMap::new();
357 input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
358
359 let result = engine.create_partition(input);
360 let normalized = normalize(result);
361
362 assert_eq!(normalized.len(), 3);
364
365 for flow_id in [f(1), f(2), f(3)] {
366 let units = &normalized[&flow_id];
367 assert_eq!(units.len(), 1);
368 let (ver, sources) = snapshot_unit(&units[0]);
369 assert_eq!(ver, v(1));
370 assert_eq!(sources.get(&s(1)), Some(&1));
371 }
372 }
373
374 #[test]
375 fn test_single_version_multi_source_partial_overlap() {
376 let mut subscriptions = HashMap::new();
378 subscriptions.insert(s(1), vec![f(1), f(2)]);
379 subscriptions.insert(s(2), vec![f(2), f(3)]);
380 let engine = setup_test_engine(subscriptions);
381
382 let mut input = HashMap::new();
384 input.insert(v(7), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
385
386 let result = engine.create_partition(input);
387 let normalized = normalize(result);
388
389 assert_eq!(normalized.len(), 3);
390
391 let f1_units = &normalized[&f(1)];
393 assert_eq!(f1_units.len(), 1);
394 let (ver, sources) = snapshot_unit(&f1_units[0]);
395 assert_eq!(ver, v(7));
396 assert_eq!(sources.len(), 1);
397 assert_eq!(sources.get(&s(1)), Some(&1));
398
399 let f2_units = &normalized[&f(2)];
401 assert_eq!(f2_units.len(), 1);
402 let (ver, sources) = snapshot_unit(&f2_units[0]);
403 assert_eq!(ver, v(7));
404 assert_eq!(sources.len(), 2);
405 assert_eq!(sources.get(&s(1)), Some(&1));
406 assert_eq!(sources.get(&s(2)), Some(&2));
407
408 let f3_units = &normalized[&f(3)];
410 assert_eq!(f3_units.len(), 1);
411 let (ver, sources) = snapshot_unit(&f3_units[0]);
412 assert_eq!(ver, v(7));
413 assert_eq!(sources.len(), 1);
414 assert_eq!(sources.get(&s(2)), Some(&2));
415 }
416
417 #[test]
418 fn test_unknown_source_filtered() {
419 let mut subscriptions = HashMap::new();
421 subscriptions.insert(s(1), vec![f(1)]);
422 let engine = setup_test_engine(subscriptions);
423
424 let mut input = HashMap::new();
426 input.insert(v(3), vec![(s(999), vec![mk_diff("x")])]);
427
428 let result = engine.create_partition(input);
429
430 assert!(result.is_empty(), "Unknown sources should produce no units");
431 }
432
433 #[test]
434 fn test_multi_version_ordering() {
435 let mut subscriptions = HashMap::new();
437 subscriptions.insert(s(1), vec![f(1)]);
438 subscriptions.insert(s(2), vec![f(2)]);
439 let engine = setup_test_engine(subscriptions);
440
441 let mut input = HashMap::new();
443 input.insert(v(20), vec![(s(1), vec![mk_diff("a")])]);
444 input.insert(v(10), vec![(s(1), vec![mk_diff("b")])]);
445 input.insert(v(30), vec![(s(2), vec![mk_diff("c")])]);
446
447 let result = engine.create_partition(input);
448 let normalized = normalize(result);
449
450 let f1_units = &normalized[&f(1)];
452 assert_eq!(f1_units.len(), 2);
453 assert_eq!(f1_units[0].version, v(10));
454 assert_eq!(f1_units[1].version, v(20));
455
456 let f2_units = &normalized[&f(2)];
458 assert_eq!(f2_units.len(), 1);
459 assert_eq!(f2_units[0].version, v(30));
460 }
461
462 #[test]
463 fn test_version_gaps_preserved() {
464 let mut subscriptions = HashMap::new();
466 subscriptions.insert(s(1), vec![f(1)]);
467 let engine = setup_test_engine(subscriptions);
468
469 let mut input = HashMap::new();
471 input.insert(v(1), vec![(s(1), vec![mk_diff("a")])]);
472 input.insert(v(100), vec![(s(1), vec![mk_diff("b")])]);
473
474 let result = engine.create_partition(input);
475 let normalized = normalize(result);
476
477 let f1_units = &normalized[&f(1)];
479 assert_eq!(f1_units.len(), 2);
480 assert_eq!(f1_units[0].version, v(1));
481 assert_eq!(f1_units[1].version, v(100));
482 }
483
484 #[test]
485 fn test_duplicate_source_entries_merged() {
486 let mut subscriptions = HashMap::new();
488 subscriptions.insert(s(1), vec![f(1)]);
489 let engine = setup_test_engine(subscriptions);
490
491 let mut input = HashMap::new();
493 input.insert(v(5), vec![(s(1), vec![mk_diff("x"), mk_diff("y")]), (s(1), vec![mk_diff("z")])]);
494
495 let result = engine.create_partition(input);
496 let normalized = normalize(result);
497
498 let f1_units = &normalized[&f(1)];
500 assert_eq!(f1_units.len(), 1);
501 let (ver, sources) = snapshot_unit(&f1_units[0]);
502 assert_eq!(ver, v(5));
503 assert_eq!(sources.get(&s(1)), Some(&3));
504 }
505
506 #[test]
507 fn test_flow_with_multiple_sources_same_version() {
508 let mut subscriptions = HashMap::new();
510 subscriptions.insert(s(1), vec![f(1)]);
511 subscriptions.insert(s(2), vec![f(1)]);
512 let engine = setup_test_engine(subscriptions);
513
514 let mut input = HashMap::new();
516 input.insert(v(8), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
517
518 let result = engine.create_partition(input);
519 let normalized = normalize(result);
520
521 let f1_units = &normalized[&f(1)];
523 assert_eq!(f1_units.len(), 1);
524 let (ver, sources) = snapshot_unit(&f1_units[0]);
525 assert_eq!(ver, v(8));
526 assert_eq!(sources.len(), 2);
527 assert_eq!(sources.get(&s(1)), Some(&1));
528 assert_eq!(sources.get(&s(2)), Some(&2));
529 }
530
531 #[test]
532 fn test_no_work_for_unaffected_flows() {
533 let mut subscriptions = HashMap::new();
535 subscriptions.insert(s(1), vec![f(1)]);
536 subscriptions.insert(s(2), vec![f(2)]);
537 let engine = setup_test_engine(subscriptions);
538
539 let mut input = HashMap::new();
541 input.insert(v(2), vec![(s(1), vec![mk_diff("a")])]);
542
543 let result = engine.create_partition(input);
544 let normalized = normalize(result);
545
546 assert_eq!(normalized.len(), 1);
548 assert!(normalized.contains_key(&f(1)));
549 assert!(!normalized.contains_key(&f(2)));
550 }
551
552 #[test]
553 fn test_complex_multi_flow_multi_version() {
554 let mut subscriptions = HashMap::new();
556 subscriptions.insert(s(1), vec![f(1), f(2)]);
557 subscriptions.insert(s(2), vec![f(2)]);
558 let engine = setup_test_engine(subscriptions);
559
560 let mut input = HashMap::new();
563 input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
564 input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
565
566 let result = engine.create_partition(input);
567 let normalized = normalize(result);
568
569 let f1_units = &normalized[&f(1)];
571 assert_eq!(f1_units.len(), 2);
572
573 let (ver1, sources1) = snapshot_unit(&f1_units[0]);
574 assert_eq!(ver1, v(1));
575 assert_eq!(sources1.get(&s(1)), Some(&1));
576
577 let (ver2, sources2) = snapshot_unit(&f1_units[1]);
578 assert_eq!(ver2, v(2));
579 assert_eq!(sources2.get(&s(1)), Some(&1));
580
581 let f2_units = &normalized[&f(2)];
583 assert_eq!(f2_units.len(), 2);
584
585 let (ver1, sources1) = snapshot_unit(&f2_units[0]);
586 assert_eq!(ver1, v(1));
587 assert_eq!(sources1.get(&s(1)), Some(&1));
588 assert_eq!(sources1.get(&s(2)), Some(&1));
589
590 let (ver2, sources2) = snapshot_unit(&f2_units[1]);
591 assert_eq!(ver2, v(2));
592 assert_eq!(sources2.get(&s(1)), Some(&1));
593 }
594
595 #[test]
596 fn test_large_diffs_zero_subscribers() {
597 let engine = setup_test_engine(HashMap::new());
598
599 let diffs: Vec<FlowDiff> = (0..1000).map(|i| mk_diff(&format!("d{}", i))).collect();
601 let mut input = HashMap::new();
602 input.insert(v(1), vec![(s(1), diffs)]);
603
604 let result = engine.create_partition(input);
605
606 assert!(result.is_empty(), "No subscribers means no units");
607 }
608
609 #[test]
610 fn test_many_versions_sparse_changes() {
611 let mut subscriptions = HashMap::new();
613 subscriptions.insert(s(1), vec![f(1)]);
614 let engine = setup_test_engine(subscriptions);
615
616 let mut input = HashMap::new();
618 for i in 1..=100 {
619 if i == 10 || i == 50 || i == 90 {
620 input.insert(v(i), vec![(s(1), vec![mk_diff(&format!("d{}", i))])]);
621 } else {
622 input.insert(v(i), vec![(s(999), vec![mk_diff("x")])]);
624 }
625 }
626
627 let result = engine.create_partition(input);
628 let normalized = normalize(result);
629
630 let f1_units = &normalized[&f(1)];
632 assert_eq!(f1_units.len(), 3);
633 assert_eq!(f1_units[0].version, v(10));
634 assert_eq!(f1_units[1].version, v(50));
635 assert_eq!(f1_units[2].version, v(90));
636 }
637
638 #[test]
639 fn test_many_sources_selective_subscription() {
640 let mut subscriptions = HashMap::new();
642 for i in 1..=50 {
643 if i % 10 == 5 {
644 subscriptions.insert(s(i), vec![f(1)]);
645 }
646 }
647 let engine = setup_test_engine(subscriptions);
648
649 let mut changes = vec![];
651 for i in 1..=50 {
652 changes.push((s(i), vec![mk_diff(&format!("d{}", i))]));
653 }
654 let mut input = HashMap::new();
655 input.insert(v(1), changes);
656
657 let result = engine.create_partition(input);
658 let normalized = normalize(result);
659
660 let f1_units = &normalized[&f(1)];
662 assert_eq!(f1_units.len(), 1);
663 let (_, sources) = snapshot_unit(&f1_units[0]);
664 assert_eq!(sources.len(), 5);
665 assert!(sources.contains_key(&s(5)));
666 assert!(sources.contains_key(&s(15)));
667 assert!(sources.contains_key(&s(25)));
668 assert!(sources.contains_key(&s(35)));
669 assert!(sources.contains_key(&s(45)));
670 }
671
672 #[test]
673 fn test_input_permutation_invariance() {
674 let mut subscriptions = HashMap::new();
676 subscriptions.insert(s(1), vec![f(1), f(2)]);
677 let engine = setup_test_engine(subscriptions);
678
679 let test_versions =
681 vec![vec![v(10), v(20), v(30)], vec![v(30), v(10), v(20)], vec![v(20), v(30), v(10)]];
682
683 let mut results = vec![];
684 for versions in test_versions {
685 let mut input = HashMap::new();
686 for ver in versions {
687 input.insert(ver, vec![(s(1), vec![mk_diff(&format!("d{}", ver.0))])]);
688 }
689 let result = engine.create_partition(input);
690 results.push(normalize(result));
691 }
692
693 for i in 1..results.len() {
695 assert_normalized_eq(&results[0], &results[i]);
696 }
697
698 let f1_units = &results[0][&f(1)];
700 assert_eq!(f1_units.len(), 3);
701 assert_eq!(f1_units[0].version, v(10));
702 assert_eq!(f1_units[1].version, v(20));
703 assert_eq!(f1_units[2].version, v(30));
704 }
705
706 #[test]
707 fn test_empty_diff_vec_handling() {
708 let mut subscriptions = HashMap::new();
710 subscriptions.insert(s(1), vec![f(1)]);
711 let engine = setup_test_engine(subscriptions);
712
713 let mut input = HashMap::new();
715 input.insert(v(1), vec![(s(1), vec![])]);
716
717 let result = engine.create_partition(input);
718 let normalized = normalize(result);
719
720 let f1_units = &normalized[&f(1)];
723 assert_eq!(f1_units.len(), 1);
724 let (ver, sources) = snapshot_unit(&f1_units[0]);
725 assert_eq!(ver, v(1));
726 assert_eq!(sources.get(&s(1)), Some(&0));
727 }
728
729 #[test]
730 fn test_all_sources_unknown() {
731 let mut subscriptions = HashMap::new();
733 subscriptions.insert(s(1), vec![f(1)]);
734 let engine = setup_test_engine(subscriptions);
735
736 let mut input = HashMap::new();
738 input.insert(v(1), vec![(s(999), vec![mk_diff("x")])]);
739 input.insert(v(2), vec![(s(888), vec![mk_diff("y")])]);
740
741 let result = engine.create_partition(input);
742
743 assert!(result.is_empty(), "All unknown sources should produce empty output");
744 }
745
746 #[test]
747 fn test_permutation_regression_fanout() {
748 let mut subscriptions = HashMap::new();
750 subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
751 let engine = setup_test_engine(subscriptions);
752
753 let mut input = HashMap::new();
755 input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
756
757 let expected = normalize(engine.create_partition(input.clone()));
758
759 for _ in 0..5 {
761 let mut entries: Vec<_> = input.clone().into_iter().collect();
762 entries.shuffle(&mut rand::rng());
763 let shuffled: HashMap<_, _> = entries.into_iter().collect();
764
765 let result = normalize(engine.create_partition(shuffled));
766 assert_normalized_eq(&result, &expected);
767 }
768 }
769
770 #[test]
771 fn test_permutation_regression_complex() {
772 use rand::prelude::*;
773
774 let mut subscriptions = HashMap::new();
776 subscriptions.insert(s(1), vec![f(1), f(2)]);
777 subscriptions.insert(s(2), vec![f(2)]);
778 let engine = setup_test_engine(subscriptions);
779
780 let mut input = HashMap::new();
782 input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
783 input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
784
785 let expected = normalize(engine.create_partition(input.clone()));
786
787 for _ in 0..5 {
789 let mut entries: Vec<_> = input.clone().into_iter().collect();
790 entries.shuffle(&mut rand::rng());
791 let shuffled: HashMap<_, _> = entries.into_iter().collect();
792
793 let result = normalize(engine.create_partition(shuffled));
794 assert_normalized_eq(&result, &expected);
795 }
796 }
797
798 #[test]
799 fn test_large_input_smoke() {
800 let mut subscriptions = HashMap::new();
802 for flow_i in 1..=20 {
803 for source_i in ((flow_i - 1) * 3 + 1)..=((flow_i - 1) * 3 + 3) {
805 let source_id = s(source_i % 20 + 1);
806 subscriptions.entry(source_id).or_insert_with(Vec::new).push(f(flow_i));
807 }
808 }
809 let engine = setup_test_engine(subscriptions);
810
811 let mut input = HashMap::new();
813 for ver_i in 1..=1000 {
814 let mut changes = vec![];
815 for source_i in 1..=5 {
816 changes.push((s((ver_i % 20) + source_i), vec![mk_diff(&format!("d{}", ver_i))]));
817 }
818 input.insert(v(ver_i), changes);
819 }
820
821 let result = engine.create_partition(input);
823 let normalized = normalize(result);
824
825 assert!(!normalized.is_empty());
827
828 for (_, units) in normalized {
830 for i in 1..units.len() {
831 assert!(units[i - 1].version < units[i].version, "Versions not sorted");
832 }
833 }
834 }
835
836 #[test]
837 fn test_version_ordering_maintained_under_stress() {
838 let mut subscriptions = HashMap::new();
840 subscriptions.insert(s(1), vec![f(1)]);
841 let engine = setup_test_engine(subscriptions);
842
843 let mut rng = rng();
845 let mut versions: Vec<u64> = (1..=100).collect();
846 for _ in 0..10 {
847 versions.shuffle(&mut rng);
848 }
849
850 let mut input = HashMap::new();
851 for ver in versions {
852 input.insert(v(ver), vec![(s(1), vec![mk_diff(&format!("d{}", ver))])]);
853 }
854
855 let result = engine.create_partition(input);
856 let normalized = normalize(result);
857
858 let f1_units = &normalized[&f(1)];
860 assert_eq!(f1_units.len(), 100);
861
862 for i in 0..100 {
863 assert_eq!(f1_units[i].version, v((i + 1) as u64));
864 }
865 }
866}