1use std::collections::BTreeMap;
5
6use indexmap::IndexMap;
7use reifydb_core::{
8 CommitVersion,
9 interface::{FlowId, SourceId},
10 log_trace,
11};
12use reifydb_flow_operator_sdk::{FlowChange, FlowDiff};
13
14use crate::worker::{UnitOfWork, UnitsOfWork};
15
16impl crate::engine::FlowEngine {
17 pub fn create_partition(
30 &self,
31 changes_by_version: BTreeMap<CommitVersion, Vec<(SourceId, Vec<FlowDiff>)>>,
32 ) -> UnitsOfWork {
33 let mut all_units_by_flow: BTreeMap<FlowId, Vec<UnitOfWork>> = BTreeMap::new();
34
35 for (version, changes) in &changes_by_version {
37 let version = *version;
38
39 let mut changes_by_source: IndexMap<SourceId, Vec<FlowDiff>> = IndexMap::new();
42 for (source_id, diffs) in changes {
43 changes_by_source.entry(*source_id).or_insert_with(Vec::new).extend(diffs.clone());
44 }
45
46 let version_units = self.partition_into_units_of_work(changes_by_source, version);
48
49 for flow_units in version_units.into_inner() {
52 for unit in flow_units {
53 all_units_by_flow.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
54 }
55 }
56 }
57
58 let units_vec: Vec<Vec<UnitOfWork>> = all_units_by_flow.into_iter().map(|(_, units)| units).collect();
60
61 for (seq, flow_units) in units_vec.iter().enumerate() {
63 if !flow_units.is_empty() {
64 let flow_id = flow_units[0].flow_id;
65 let versions: Vec<_> = flow_units.iter().map(|u| u.version.0).collect();
66 log_trace!("[PARTITION] OUT seq={} flow={:?} versions={:?}", seq, flow_id, versions);
67 }
68 }
69
70 {
73 use std::collections::HashSet;
74 let mut seen_flows = HashSet::new();
75
76 for flow_units in &units_vec {
77 assert!(!flow_units.is_empty(), "INVARIANT VIOLATED: Empty flow units in UnitsOfWork");
78
79 let flow_id = flow_units[0].flow_id;
80 assert!(
81 !seen_flows.contains(&flow_id),
82 "INVARIANT VIOLATED: flow_id {:?} appears multiple times in UnitsOfWork. \
83 This means the same flow will be processed by multiple parallel tasks, \
84 causing keyspace overlap.",
85 flow_id
86 );
87
88 for unit in flow_units {
90 assert_eq!(
91 unit.flow_id, flow_id,
92 "INVARIANT VIOLATED: Mixed flow_ids in same Vec - expected {:?}, got {:?}. \
93 All units in a Vec must belong to the same flow.",
94 flow_id, unit.flow_id
95 );
96 }
97
98 seen_flows.insert(flow_id);
99 }
100 }
101
102 UnitsOfWork::new(units_vec)
103 }
104
105 fn partition_into_units_of_work(
106 &self,
107 changes_by_source: IndexMap<SourceId, Vec<FlowDiff>>,
108 version: CommitVersion,
109 ) -> UnitsOfWork {
110 let mut flow_changes: BTreeMap<FlowId, Vec<FlowChange>> = BTreeMap::new();
112
113 let sources = self.inner.sources.read();
115 let flow_creation_version = self.inner.flow_creation_versions.read();
116
117 for (source_id, diffs) in changes_by_source {
119 if let Some(subscriptions) = sources.get(&source_id) {
121 for (flow_id, node_id) in subscriptions {
122 if let Some(&flow_creation_version) = flow_creation_version.get(flow_id) {
124 if version < flow_creation_version {
125 continue;
126 }
127 }
128
129 let change = FlowChange::internal(*node_id, version, diffs.clone());
133 flow_changes.entry(*flow_id).or_insert_with(Vec::new).push(change);
134 }
135 }
136 }
137
138 let units: Vec<Vec<UnitOfWork>> = flow_changes
141 .into_iter()
142 .map(|(flow_id, source_changes)| vec![UnitOfWork::new(flow_id, version, source_changes)])
143 .collect();
144
145 UnitsOfWork::new(units)
146 }
147}
148
149#[cfg(test)]
150mod tests {
151 use std::{
152 collections::{BTreeMap, HashMap},
153 sync::Arc,
154 };
155
156 use parking_lot::RwLock;
157 use rand::{rng, seq::SliceRandom};
158 use reifydb_core::{
159 CommitVersion, Row,
160 event::EventBus,
161 interface::{FlowId, FlowNodeId, SourceId, TableId},
162 util::CowVec,
163 value::encoded::{EncodedValues, EncodedValuesNamedLayout},
164 };
165 use reifydb_engine::{StandardRowEvaluator, execute::Executor};
166 use reifydb_flow_operator_sdk::{FlowChangeOrigin, FlowDiff};
167 use reifydb_rql::flow::FlowGraphAnalyzer;
168 use reifydb_type::{RowNumber, Type};
169
170 use crate::{
171 engine::{FlowEngine, FlowEngineInner},
172 operator::transform::registry::TransformOperatorRegistry,
173 worker::{UnitOfWork, UnitsOfWork},
174 };
175
176 fn mk_sources(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> HashMap<SourceId, Vec<(FlowId, FlowNodeId)>> {
181 let mut sources_map = HashMap::new();
182 for (source_id, flows) in subscriptions {
183 let source_num = match source_id {
184 SourceId::Table(tid) => tid.0,
185 _ => panic!("Only Table sources supported in tests"),
186 };
187 let subscriptions_with_nodes: Vec<(FlowId, FlowNodeId)> = flows
188 .into_iter()
189 .map(|flow_id| {
190 let node_id = FlowNodeId(source_num * 1000 + flow_id.0);
192 (flow_id, node_id)
193 })
194 .collect();
195 sources_map.insert(source_id, subscriptions_with_nodes);
196 }
197 sources_map
198 }
199
200 fn setup_test_engine(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> FlowEngine {
201 let evaluator = StandardRowEvaluator::default();
202 let executor = Executor::testing();
203 let registry = TransformOperatorRegistry::new();
204
205 let sources = mk_sources(subscriptions);
206
207 let inner = FlowEngineInner {
208 evaluator,
209 executor,
210 registry,
211 operators: RwLock::new(HashMap::new()),
212 flows: RwLock::new(HashMap::new()),
213 sources: RwLock::new(sources),
214 sinks: RwLock::new(HashMap::new()),
215 analyzer: RwLock::new(FlowGraphAnalyzer::new()),
216 event_bus: EventBus::new(),
217 flow_creation_versions: RwLock::new(HashMap::new()),
218 };
219
220 FlowEngine {
221 inner: Arc::new(inner),
222 }
223 }
224
225 fn mk_diff(label: &str) -> FlowDiff {
227 let row = mk_row(label);
228 FlowDiff::Insert {
229 post: row,
230 }
231 }
232
233 fn mk_row(label: &str) -> Row {
235 Row {
236 number: RowNumber(label.len() as u64),
237 encoded: EncodedValues(CowVec::new(label.as_bytes().to_vec())),
238 layout: EncodedValuesNamedLayout::new(std::iter::once(("test".to_string(), Type::Uint8))),
239 }
240 }
241
242 fn normalize(units: UnitsOfWork) -> BTreeMap<FlowId, Vec<UnitOfWork>> {
244 let mut map = BTreeMap::new();
245 for flow_units in units.into_inner() {
246 for unit in flow_units {
247 map.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
248 }
249 }
250 for vec in map.values_mut() {
252 vec.sort_by_key(|u| u.version);
253 }
254 map
255 }
256
257 fn snapshot_unit(unit: &UnitOfWork) -> (CommitVersion, BTreeMap<SourceId, usize>) {
261 let mut sources = BTreeMap::new();
262
263 for change in &unit.source_changes {
264 match change.origin {
265 FlowChangeOrigin::External(source_id) => {
266 let count = change.diffs.len();
267 *sources.entry(source_id).or_insert(0) += count;
268 }
269 FlowChangeOrigin::Internal(node_id) => {
270 let source_num = node_id.0 / 1000;
273 let source_id = SourceId::Table(TableId(source_num));
274 let count = change.diffs.len();
275 *sources.entry(source_id).or_insert(0) += count;
276 }
277 }
278 }
279 (unit.version, sources)
280 }
281
282 fn s(id: u64) -> SourceId {
284 SourceId::Table(TableId(id))
285 }
286
287 fn f(id: u64) -> FlowId {
289 FlowId(id)
290 }
291
292 fn v(ver: u64) -> CommitVersion {
294 CommitVersion(ver)
295 }
296
297 fn assert_normalized_eq(
299 result: &BTreeMap<FlowId, Vec<UnitOfWork>>,
300 expected: &BTreeMap<FlowId, Vec<UnitOfWork>>,
301 ) {
302 assert_eq!(result.len(), expected.len(), "Different number of flows");
303
304 for (flow_id, result_units) in result {
305 let expected_units =
306 expected.get(flow_id).expect(&format!("Flow {:?} missing in expected", flow_id));
307 assert_eq!(
308 result_units.len(),
309 expected_units.len(),
310 "Flow {:?} has different unit count",
311 flow_id
312 );
313
314 for (i, (result_unit, expected_unit)) in
315 result_units.iter().zip(expected_units.iter()).enumerate()
316 {
317 let result_snapshot = snapshot_unit(result_unit);
318 let expected_snapshot = snapshot_unit(expected_unit);
319 assert_eq!(
320 result_snapshot, expected_snapshot,
321 "Flow {:?} unit {} differs: {:?} vs {:?}",
322 flow_id, i, result_snapshot, expected_snapshot
323 );
324 }
325 }
326 }
327
328 #[test]
329 fn test_empty_input() {
330 let engine = setup_test_engine(HashMap::new());
331 let input = BTreeMap::new();
332
333 let result = engine.create_partition(input);
334
335 assert!(result.is_empty(), "Empty input should produce empty output");
336 }
337
338 #[test]
339 fn test_single_version_single_source_single_flow() {
340 let mut subscriptions = HashMap::new();
342 subscriptions.insert(s(1), vec![f(1)]);
343 let engine = setup_test_engine(subscriptions);
344
345 let mut input = BTreeMap::new();
347 input.insert(v(10), vec![(s(1), vec![mk_diff("d1"), mk_diff("d2")])]);
348
349 let result = engine.create_partition(input);
350 let normalized = normalize(result);
351
352 assert_eq!(normalized.len(), 1);
354 let f1_units = &normalized[&f(1)];
355 assert_eq!(f1_units.len(), 1);
356
357 let (ver, sources) = snapshot_unit(&f1_units[0]);
358 assert_eq!(ver, v(10));
359 assert_eq!(sources.get(&s(1)), Some(&2));
360 }
361
362 #[test]
363 fn test_single_version_multi_flow_fanout() {
364 let mut subscriptions = HashMap::new();
366 subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
367 let engine = setup_test_engine(subscriptions);
368
369 let mut input = BTreeMap::new();
371 input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
372
373 let result = engine.create_partition(input);
374 let normalized = normalize(result);
375
376 assert_eq!(normalized.len(), 3);
378
379 for flow_id in [f(1), f(2), f(3)] {
380 let units = &normalized[&flow_id];
381 assert_eq!(units.len(), 1);
382 let (ver, sources) = snapshot_unit(&units[0]);
383 assert_eq!(ver, v(1));
384 assert_eq!(sources.get(&s(1)), Some(&1));
385 }
386 }
387
388 #[test]
389 fn test_single_version_multi_source_partial_overlap() {
390 let mut subscriptions = HashMap::new();
392 subscriptions.insert(s(1), vec![f(1), f(2)]);
393 subscriptions.insert(s(2), vec![f(2), f(3)]);
394 let engine = setup_test_engine(subscriptions);
395
396 let mut input = BTreeMap::new();
398 input.insert(v(7), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
399
400 let result = engine.create_partition(input);
401 let normalized = normalize(result);
402
403 assert_eq!(normalized.len(), 3);
404
405 let f1_units = &normalized[&f(1)];
407 assert_eq!(f1_units.len(), 1);
408 let (ver, sources) = snapshot_unit(&f1_units[0]);
409 assert_eq!(ver, v(7));
410 assert_eq!(sources.len(), 1);
411 assert_eq!(sources.get(&s(1)), Some(&1));
412
413 let f2_units = &normalized[&f(2)];
415 assert_eq!(f2_units.len(), 1);
416 let (ver, sources) = snapshot_unit(&f2_units[0]);
417 assert_eq!(ver, v(7));
418 assert_eq!(sources.len(), 2);
419 assert_eq!(sources.get(&s(1)), Some(&1));
420 assert_eq!(sources.get(&s(2)), Some(&2));
421
422 let f3_units = &normalized[&f(3)];
424 assert_eq!(f3_units.len(), 1);
425 let (ver, sources) = snapshot_unit(&f3_units[0]);
426 assert_eq!(ver, v(7));
427 assert_eq!(sources.len(), 1);
428 assert_eq!(sources.get(&s(2)), Some(&2));
429 }
430
431 #[test]
432 fn test_unknown_source_filtered() {
433 let mut subscriptions = HashMap::new();
435 subscriptions.insert(s(1), vec![f(1)]);
436 let engine = setup_test_engine(subscriptions);
437
438 let mut input = BTreeMap::new();
440 input.insert(v(3), vec![(s(999), vec![mk_diff("x")])]);
441
442 let result = engine.create_partition(input);
443
444 assert!(result.is_empty(), "Unknown sources should produce no units");
445 }
446
447 #[test]
448 fn test_multi_version_ordering() {
449 let mut subscriptions = HashMap::new();
451 subscriptions.insert(s(1), vec![f(1)]);
452 subscriptions.insert(s(2), vec![f(2)]);
453 let engine = setup_test_engine(subscriptions);
454
455 let mut input = BTreeMap::new();
457 input.insert(v(20), vec![(s(1), vec![mk_diff("a")])]);
458 input.insert(v(10), vec![(s(1), vec![mk_diff("b")])]);
459 input.insert(v(30), vec![(s(2), vec![mk_diff("c")])]);
460
461 let result = engine.create_partition(input);
462 let normalized = normalize(result);
463
464 let f1_units = &normalized[&f(1)];
466 assert_eq!(f1_units.len(), 2);
467 assert_eq!(f1_units[0].version, v(10));
468 assert_eq!(f1_units[1].version, v(20));
469
470 let f2_units = &normalized[&f(2)];
472 assert_eq!(f2_units.len(), 1);
473 assert_eq!(f2_units[0].version, v(30));
474 }
475
476 #[test]
477 fn test_version_gaps_preserved() {
478 let mut subscriptions = HashMap::new();
480 subscriptions.insert(s(1), vec![f(1)]);
481 let engine = setup_test_engine(subscriptions);
482
483 let mut input = BTreeMap::new();
485 input.insert(v(1), vec![(s(1), vec![mk_diff("a")])]);
486 input.insert(v(100), vec![(s(1), vec![mk_diff("b")])]);
487
488 let result = engine.create_partition(input);
489 let normalized = normalize(result);
490
491 let f1_units = &normalized[&f(1)];
493 assert_eq!(f1_units.len(), 2);
494 assert_eq!(f1_units[0].version, v(1));
495 assert_eq!(f1_units[1].version, v(100));
496 }
497
498 #[test]
499 fn test_duplicate_source_entries_merged() {
500 let mut subscriptions = HashMap::new();
502 subscriptions.insert(s(1), vec![f(1)]);
503 let engine = setup_test_engine(subscriptions);
504
505 let mut input = BTreeMap::new();
507 input.insert(v(5), vec![(s(1), vec![mk_diff("x"), mk_diff("y")]), (s(1), vec![mk_diff("z")])]);
508
509 let result = engine.create_partition(input);
510 let normalized = normalize(result);
511
512 let f1_units = &normalized[&f(1)];
514 assert_eq!(f1_units.len(), 1);
515 let (ver, sources) = snapshot_unit(&f1_units[0]);
516 assert_eq!(ver, v(5));
517 assert_eq!(sources.get(&s(1)), Some(&3));
518 }
519
520 #[test]
521 fn test_flow_with_multiple_sources_same_version() {
522 let mut subscriptions = HashMap::new();
524 subscriptions.insert(s(1), vec![f(1)]);
525 subscriptions.insert(s(2), vec![f(1)]);
526 let engine = setup_test_engine(subscriptions);
527
528 let mut input = BTreeMap::new();
530 input.insert(v(8), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
531
532 let result = engine.create_partition(input);
533 let normalized = normalize(result);
534
535 let f1_units = &normalized[&f(1)];
537 assert_eq!(f1_units.len(), 1);
538 let (ver, sources) = snapshot_unit(&f1_units[0]);
539 assert_eq!(ver, v(8));
540 assert_eq!(sources.len(), 2);
541 assert_eq!(sources.get(&s(1)), Some(&1));
542 assert_eq!(sources.get(&s(2)), Some(&2));
543 }
544
545 #[test]
546 fn test_no_work_for_unaffected_flows() {
547 let mut subscriptions = HashMap::new();
549 subscriptions.insert(s(1), vec![f(1)]);
550 subscriptions.insert(s(2), vec![f(2)]);
551 let engine = setup_test_engine(subscriptions);
552
553 let mut input = BTreeMap::new();
555 input.insert(v(2), vec![(s(1), vec![mk_diff("a")])]);
556
557 let result = engine.create_partition(input);
558 let normalized = normalize(result);
559
560 assert_eq!(normalized.len(), 1);
562 assert!(normalized.contains_key(&f(1)));
563 assert!(!normalized.contains_key(&f(2)));
564 }
565
566 #[test]
567 fn test_complex_multi_flow_multi_version() {
568 let mut subscriptions = HashMap::new();
570 subscriptions.insert(s(1), vec![f(1), f(2)]);
571 subscriptions.insert(s(2), vec![f(2)]);
572 let engine = setup_test_engine(subscriptions);
573
574 let mut input = BTreeMap::new();
577 input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
578 input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
579
580 let result = engine.create_partition(input);
581 let normalized = normalize(result);
582
583 let f1_units = &normalized[&f(1)];
585 assert_eq!(f1_units.len(), 2);
586
587 let (ver1, sources1) = snapshot_unit(&f1_units[0]);
588 assert_eq!(ver1, v(1));
589 assert_eq!(sources1.get(&s(1)), Some(&1));
590
591 let (ver2, sources2) = snapshot_unit(&f1_units[1]);
592 assert_eq!(ver2, v(2));
593 assert_eq!(sources2.get(&s(1)), Some(&1));
594
595 let f2_units = &normalized[&f(2)];
597 assert_eq!(f2_units.len(), 2);
598
599 let (ver1, sources1) = snapshot_unit(&f2_units[0]);
600 assert_eq!(ver1, v(1));
601 assert_eq!(sources1.get(&s(1)), Some(&1));
602 assert_eq!(sources1.get(&s(2)), Some(&1));
603
604 let (ver2, sources2) = snapshot_unit(&f2_units[1]);
605 assert_eq!(ver2, v(2));
606 assert_eq!(sources2.get(&s(1)), Some(&1));
607 }
608
609 #[test]
610 fn test_large_diffs_zero_subscribers() {
611 let engine = setup_test_engine(HashMap::new());
612
613 let diffs: Vec<FlowDiff> = (0..1000).map(|i| mk_diff(&format!("d{}", i))).collect();
615 let mut input = BTreeMap::new();
616 input.insert(v(1), vec![(s(1), diffs)]);
617
618 let result = engine.create_partition(input);
619
620 assert!(result.is_empty(), "No subscribers means no units");
621 }
622
623 #[test]
624 fn test_many_versions_sparse_changes() {
625 let mut subscriptions = HashMap::new();
627 subscriptions.insert(s(1), vec![f(1)]);
628 let engine = setup_test_engine(subscriptions);
629
630 let mut input = BTreeMap::new();
632 for i in 1..=100 {
633 if i == 10 || i == 50 || i == 90 {
634 input.insert(v(i), vec![(s(1), vec![mk_diff(&format!("d{}", i))])]);
635 } else {
636 input.insert(v(i), vec![(s(999), vec![mk_diff("x")])]);
638 }
639 }
640
641 let result = engine.create_partition(input);
642 let normalized = normalize(result);
643
644 let f1_units = &normalized[&f(1)];
646 assert_eq!(f1_units.len(), 3);
647 assert_eq!(f1_units[0].version, v(10));
648 assert_eq!(f1_units[1].version, v(50));
649 assert_eq!(f1_units[2].version, v(90));
650 }
651
652 #[test]
653 fn test_many_sources_selective_subscription() {
654 let mut subscriptions = HashMap::new();
656 for i in 1..=50 {
657 if i % 10 == 5 {
658 subscriptions.insert(s(i), vec![f(1)]);
659 }
660 }
661 let engine = setup_test_engine(subscriptions);
662
663 let mut changes = vec![];
665 for i in 1..=50 {
666 changes.push((s(i), vec![mk_diff(&format!("d{}", i))]));
667 }
668 let mut input = BTreeMap::new();
669 input.insert(v(1), changes);
670
671 let result = engine.create_partition(input);
672 let normalized = normalize(result);
673
674 let f1_units = &normalized[&f(1)];
676 assert_eq!(f1_units.len(), 1);
677 let (_, sources) = snapshot_unit(&f1_units[0]);
678 assert_eq!(sources.len(), 5);
679 assert!(sources.contains_key(&s(5)));
680 assert!(sources.contains_key(&s(15)));
681 assert!(sources.contains_key(&s(25)));
682 assert!(sources.contains_key(&s(35)));
683 assert!(sources.contains_key(&s(45)));
684 }
685
686 #[test]
687 fn test_input_permutation_invariance() {
688 let mut subscriptions = HashMap::new();
690 subscriptions.insert(s(1), vec![f(1), f(2)]);
691 let engine = setup_test_engine(subscriptions);
692
693 let test_versions =
695 vec![vec![v(10), v(20), v(30)], vec![v(30), v(10), v(20)], vec![v(20), v(30), v(10)]];
696
697 let mut results = vec![];
698 for versions in test_versions {
699 let mut input = BTreeMap::new();
700 for ver in versions {
701 input.insert(ver, vec![(s(1), vec![mk_diff(&format!("d{}", ver.0))])]);
702 }
703 let result = engine.create_partition(input);
704 results.push(normalize(result));
705 }
706
707 for i in 1..results.len() {
709 assert_normalized_eq(&results[0], &results[i]);
710 }
711
712 let f1_units = &results[0][&f(1)];
714 assert_eq!(f1_units.len(), 3);
715 assert_eq!(f1_units[0].version, v(10));
716 assert_eq!(f1_units[1].version, v(20));
717 assert_eq!(f1_units[2].version, v(30));
718 }
719
720 #[test]
721 fn test_empty_diff_vec_handling() {
722 let mut subscriptions = HashMap::new();
724 subscriptions.insert(s(1), vec![f(1)]);
725 let engine = setup_test_engine(subscriptions);
726
727 let mut input = BTreeMap::new();
729 input.insert(v(1), vec![(s(1), vec![])]);
730
731 let result = engine.create_partition(input);
732 let normalized = normalize(result);
733
734 let f1_units = &normalized[&f(1)];
737 assert_eq!(f1_units.len(), 1);
738 let (ver, sources) = snapshot_unit(&f1_units[0]);
739 assert_eq!(ver, v(1));
740 assert_eq!(sources.get(&s(1)), Some(&0));
741 }
742
743 #[test]
744 fn test_all_sources_unknown() {
745 let mut subscriptions = HashMap::new();
747 subscriptions.insert(s(1), vec![f(1)]);
748 let engine = setup_test_engine(subscriptions);
749
750 let mut input = BTreeMap::new();
752 input.insert(v(1), vec![(s(999), vec![mk_diff("x")])]);
753 input.insert(v(2), vec![(s(888), vec![mk_diff("y")])]);
754
755 let result = engine.create_partition(input);
756
757 assert!(result.is_empty(), "All unknown sources should produce empty output");
758 }
759
760 #[test]
761 fn test_permutation_regression_fanout() {
762 let mut subscriptions = HashMap::new();
764 subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
765 let engine = setup_test_engine(subscriptions);
766
767 let mut input = BTreeMap::new();
769 input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
770
771 let expected = normalize(engine.create_partition(input.clone()));
772
773 for _ in 0..5 {
775 let mut entries: Vec<_> = input.clone().into_iter().collect();
776 entries.shuffle(&mut rand::rng());
777 let shuffled: BTreeMap<_, _> = entries.into_iter().collect();
778
779 let result = normalize(engine.create_partition(shuffled));
780 assert_normalized_eq(&result, &expected);
781 }
782 }
783
784 #[test]
785 fn test_permutation_regression_complex() {
786 use rand::prelude::*;
787
788 let mut subscriptions = HashMap::new();
790 subscriptions.insert(s(1), vec![f(1), f(2)]);
791 subscriptions.insert(s(2), vec![f(2)]);
792 let engine = setup_test_engine(subscriptions);
793
794 let mut input = BTreeMap::new();
796 input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
797 input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
798
799 let expected = normalize(engine.create_partition(input.clone()));
800
801 for _ in 0..5 {
803 let mut entries: Vec<_> = input.clone().into_iter().collect();
804 entries.shuffle(&mut rand::rng());
805 let shuffled: BTreeMap<_, _> = entries.into_iter().collect();
806
807 let result = normalize(engine.create_partition(shuffled));
808 assert_normalized_eq(&result, &expected);
809 }
810 }
811
812 #[test]
813 fn test_large_input_smoke() {
814 let mut subscriptions = HashMap::new();
816 for flow_i in 1..=20 {
817 for source_i in ((flow_i - 1) * 3 + 1)..=((flow_i - 1) * 3 + 3) {
819 let source_id = s(source_i % 20 + 1);
820 subscriptions.entry(source_id).or_insert_with(Vec::new).push(f(flow_i));
821 }
822 }
823 let engine = setup_test_engine(subscriptions);
824
825 let mut input = BTreeMap::new();
827 for ver_i in 1..=1000 {
828 let mut changes = vec![];
829 for source_i in 1..=5 {
830 changes.push((s((ver_i % 20) + source_i), vec![mk_diff(&format!("d{}", ver_i))]));
831 }
832 input.insert(v(ver_i), changes);
833 }
834
835 let result = engine.create_partition(input);
837 let normalized = normalize(result);
838
839 assert!(!normalized.is_empty());
841
842 for (_, units) in normalized {
844 for i in 1..units.len() {
845 assert!(units[i - 1].version < units[i].version, "Versions not sorted");
846 }
847 }
848 }
849
850 #[test]
851 fn test_version_ordering_maintained_under_stress() {
852 let mut subscriptions = HashMap::new();
853 subscriptions.insert(s(1), vec![f(1)]);
854 let engine = setup_test_engine(subscriptions);
855
856 let mut rng = rng();
857 let mut versions: Vec<u64> = (1..=100).collect();
858 for _ in 0..10 {
859 versions.shuffle(&mut rng);
860 }
861
862 let mut input = BTreeMap::new();
863 for ver in versions {
864 input.insert(v(ver), vec![(s(1), vec![mk_diff(&format!("d{}", ver))])]);
865 }
866
867 let result = engine.create_partition(input);
868 let normalized = normalize(result);
869
870 let f1_units = &normalized[&f(1)];
871 assert_eq!(f1_units.len(), 100);
872
873 for i in 0..100 {
874 assert_eq!(f1_units[i].version, v((i + 1) as u64));
875 }
876 }
877
878 #[test]
879 fn test_version_filtering() {
880 let mut subscriptions = HashMap::new();
881 subscriptions.insert(s(1), vec![f(1)]);
882 let engine = setup_test_engine(subscriptions);
883
884 engine.inner.flow_creation_versions.write().insert(f(1), v(50));
885
886 let mut input = BTreeMap::new();
887 input.insert(v(40), vec![(s(1), vec![mk_diff("d40")])]);
888 input.insert(v(50), vec![(s(1), vec![mk_diff("d50")])]);
889 input.insert(v(51), vec![(s(1), vec![mk_diff("d51")])]);
890 input.insert(v(60), vec![(s(1), vec![mk_diff("d60")])]);
891 input.insert(v(70), vec![(s(1), vec![mk_diff("d70")])]);
892
893 let result = engine.create_partition(input);
894 let normalized = normalize(result);
895
896 let f1_units = &normalized[&f(1)];
897 assert_eq!(f1_units.len(), 4);
898 assert_eq!(f1_units[0].version, v(50));
899 assert_eq!(f1_units[1].version, v(51));
900 assert_eq!(f1_units[2].version, v(60));
901 assert_eq!(f1_units[3].version, v(70));
902 }
903
904 #[test]
905 fn test_backfill_version_per_flow_isolation() {
906 let mut subscriptions = HashMap::new();
907 subscriptions.insert(s(1), vec![f(1), f(2)]);
908 let engine = setup_test_engine(subscriptions);
909
910 engine.inner.flow_creation_versions.write().insert(f(1), v(30));
911 engine.inner.flow_creation_versions.write().insert(f(2), v(50));
912
913 let mut input = BTreeMap::new();
914 input.insert(v(20), vec![(s(1), vec![mk_diff("d20")])]);
915 input.insert(v(40), vec![(s(1), vec![mk_diff("d40")])]);
916 input.insert(v(60), vec![(s(1), vec![mk_diff("d60")])]);
917
918 let result = engine.create_partition(input);
919 let normalized = normalize(result);
920
921 let f1_units = &normalized[&f(1)];
922 assert_eq!(f1_units.len(), 2);
923 assert_eq!(f1_units[0].version, v(40));
924 assert_eq!(f1_units[1].version, v(60));
925
926 let f2_units = &normalized[&f(2)];
927 assert_eq!(f2_units.len(), 1);
928 assert_eq!(f2_units[0].version, v(60));
929 }
930
931 #[test]
932 fn test_no_backfill_version_processes_all() {
933 let mut subscriptions = HashMap::new();
934 subscriptions.insert(s(1), vec![f(1)]);
935 let engine = setup_test_engine(subscriptions);
936
937 let mut input = BTreeMap::new();
938 input.insert(v(10), vec![(s(1), vec![mk_diff("d10")])]);
939 input.insert(v(20), vec![(s(1), vec![mk_diff("d20")])]);
940 input.insert(v(30), vec![(s(1), vec![mk_diff("d30")])]);
941
942 let result = engine.create_partition(input);
943 let normalized = normalize(result);
944
945 let f1_units = &normalized[&f(1)];
946 assert_eq!(f1_units.len(), 3);
947 }
948
949 #[test]
950 fn test_backfill_version_exact_boundary() {
951 let mut subscriptions = HashMap::new();
952 subscriptions.insert(s(1), vec![f(1)]);
953 let engine = setup_test_engine(subscriptions);
954
955 engine.inner.flow_creation_versions.write().insert(f(1), v(100));
956
957 let mut input = BTreeMap::new();
958 input.insert(v(99), vec![(s(1), vec![mk_diff("d99")])]);
959 input.insert(v(100), vec![(s(1), vec![mk_diff("d100")])]);
960 input.insert(v(101), vec![(s(1), vec![mk_diff("d101")])]);
961
962 let result = engine.create_partition(input);
963 let normalized = normalize(result);
964
965 let f1_units = &normalized[&f(1)];
966 assert_eq!(f1_units.len(), 2);
967 assert_eq!(f1_units[0].version, v(100));
968 assert_eq!(f1_units[1].version, v(101));
969 }
970}