1use std::collections::BTreeMap;
5
6use indexmap::IndexMap;
7use reifydb_core::{
8 CommitVersion,
9 interface::{FlowId, SourceId},
10};
11use reifydb_flow_operator_sdk::{FlowChange, FlowDiff};
12
13use crate::worker::{UnitOfWork, UnitsOfWork};
14
15impl crate::engine::FlowEngine {
16 pub async fn create_partition(
29 &self,
30 changes_by_version: BTreeMap<CommitVersion, Vec<(SourceId, Vec<FlowDiff>)>>,
31 ) -> UnitsOfWork {
32 let mut all_units_by_flow: BTreeMap<FlowId, Vec<UnitOfWork>> = BTreeMap::new();
33
34 for (version, changes) in &changes_by_version {
36 let version = *version;
37
38 let mut changes_by_source: IndexMap<SourceId, Vec<FlowDiff>> = IndexMap::new();
41 for (source_id, diffs) in changes {
42 changes_by_source.entry(*source_id).or_insert_with(Vec::new).extend(diffs.clone());
43 }
44
45 let version_units = self.partition_into_units_of_work(changes_by_source, version).await;
47
48 for flow_units in version_units.into_inner() {
51 for unit in flow_units {
52 all_units_by_flow.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
53 }
54 }
55 }
56
57 let units_vec: Vec<Vec<UnitOfWork>> = all_units_by_flow.into_iter().map(|(_, units)| units).collect();
59
60 {
63 use std::collections::HashSet;
64 let mut seen_flows = HashSet::new();
65
66 for flow_units in &units_vec {
67 assert!(!flow_units.is_empty(), "INVARIANT VIOLATED: Empty flow units in UnitsOfWork");
68
69 let flow_id = flow_units[0].flow_id;
70 assert!(
71 !seen_flows.contains(&flow_id),
72 "INVARIANT VIOLATED: flow_id {:?} appears multiple times in UnitsOfWork. \
73 This means the same flow will be processed by multiple parallel tasks, \
74 causing keyspace overlap.",
75 flow_id
76 );
77
78 for unit in flow_units {
80 assert_eq!(
81 unit.flow_id, flow_id,
82 "INVARIANT VIOLATED: Mixed flow_ids in same Vec - expected {:?}, got {:?}. \
83 All units in a Vec must belong to the same flow.",
84 flow_id, unit.flow_id
85 );
86 }
87
88 seen_flows.insert(flow_id);
89 }
90 }
91
92 UnitsOfWork::new(units_vec)
93 }
94
95 async fn partition_into_units_of_work(
96 &self,
97 changes_by_source: IndexMap<SourceId, Vec<FlowDiff>>,
98 version: CommitVersion,
99 ) -> UnitsOfWork {
100 let mut flow_changes: BTreeMap<FlowId, Vec<FlowChange>> = BTreeMap::new();
102
103 let sources = self.inner.sources.read().await;
105 let flow_creation_version = self.inner.flow_creation_versions.read().await;
106
107 for (source_id, diffs) in changes_by_source {
109 if let Some(subscriptions) = sources.get(&source_id) {
111 for (flow_id, node_id) in subscriptions {
112 if let Some(&flow_creation_version) = flow_creation_version.get(flow_id) {
114 if version < flow_creation_version {
115 continue;
116 }
117 }
118
119 let change = FlowChange::internal(*node_id, version, diffs.clone());
123 flow_changes.entry(*flow_id).or_insert_with(Vec::new).push(change);
124 }
125 }
126 }
127
128 let units: Vec<Vec<UnitOfWork>> = flow_changes
131 .into_iter()
132 .map(|(flow_id, source_changes)| vec![UnitOfWork::new(flow_id, version, source_changes)])
133 .collect();
134
135 UnitsOfWork::new(units)
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use std::{
142 collections::{BTreeMap, HashMap},
143 sync::Arc,
144 };
145
146 use rand::{rng, seq::SliceRandom};
147 use reifydb_core::{
148 CommitVersion, Row,
149 event::EventBus,
150 interface::{FlowId, FlowNodeId, SourceId, TableId},
151 util::CowVec,
152 value::encoded::{EncodedValues, EncodedValuesNamedLayout},
153 };
154 use reifydb_engine::{StandardRowEvaluator, execute::Executor};
155 use reifydb_flow_operator_sdk::{FlowChangeOrigin, FlowDiff};
156 use reifydb_rql::flow::FlowGraphAnalyzer;
157 use reifydb_type::{RowNumber, Type};
158 use tokio::sync::RwLock;
159
160 use crate::{
161 engine::{FlowEngine, FlowEngineInner},
162 operator::transform::registry::TransformOperatorRegistry,
163 worker::{UnitOfWork, UnitsOfWork},
164 };
165
166 fn mk_sources(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> HashMap<SourceId, Vec<(FlowId, FlowNodeId)>> {
171 let mut sources_map = HashMap::new();
172 for (source_id, flows) in subscriptions {
173 let source_num = match source_id {
174 SourceId::Table(tid) => tid.0,
175 _ => panic!("Only Table sources supported in tests"),
176 };
177 let subscriptions_with_nodes: Vec<(FlowId, FlowNodeId)> = flows
178 .into_iter()
179 .map(|flow_id| {
180 let node_id = FlowNodeId(source_num * 1000 + flow_id.0);
182 (flow_id, node_id)
183 })
184 .collect();
185 sources_map.insert(source_id, subscriptions_with_nodes);
186 }
187 sources_map
188 }
189
190 fn setup_test_engine(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> FlowEngine {
191 let evaluator = StandardRowEvaluator::default();
192 let executor = Executor::testing();
193 let registry = TransformOperatorRegistry::new();
194
195 let sources = mk_sources(subscriptions);
196
197 let inner = FlowEngineInner {
198 evaluator,
199 executor,
200 registry,
201 operators: RwLock::new(HashMap::new()),
202 flows: RwLock::new(HashMap::new()),
203 sources: RwLock::new(sources),
204 sinks: RwLock::new(HashMap::new()),
205 analyzer: RwLock::new(FlowGraphAnalyzer::new()),
206 event_bus: EventBus::new(),
207 flow_creation_versions: RwLock::new(HashMap::new()),
208 };
209
210 FlowEngine {
211 inner: Arc::new(inner),
212 }
213 }
214
215 fn mk_diff(label: &str) -> FlowDiff {
217 let row = mk_row(label);
218 FlowDiff::Insert {
219 post: row,
220 }
221 }
222
223 fn mk_row(label: &str) -> Row {
225 Row {
226 number: RowNumber(label.len() as u64),
227 encoded: EncodedValues(CowVec::new(label.as_bytes().to_vec())),
228 layout: EncodedValuesNamedLayout::new(std::iter::once(("test".to_string(), Type::Uint8))),
229 }
230 }
231
232 fn normalize(units: UnitsOfWork) -> BTreeMap<FlowId, Vec<UnitOfWork>> {
234 let mut map = BTreeMap::new();
235 for flow_units in units.into_inner() {
236 for unit in flow_units {
237 map.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
238 }
239 }
240 for vec in map.values_mut() {
242 vec.sort_by_key(|u| u.version);
243 }
244 map
245 }
246
247 fn snapshot_unit(unit: &UnitOfWork) -> (CommitVersion, BTreeMap<SourceId, usize>) {
251 let mut sources = BTreeMap::new();
252
253 for change in &unit.source_changes {
254 match change.origin {
255 FlowChangeOrigin::External(source_id) => {
256 let count = change.diffs.len();
257 *sources.entry(source_id).or_insert(0) += count;
258 }
259 FlowChangeOrigin::Internal(node_id) => {
260 let source_num = node_id.0 / 1000;
263 let source_id = SourceId::Table(TableId(source_num));
264 let count = change.diffs.len();
265 *sources.entry(source_id).or_insert(0) += count;
266 }
267 }
268 }
269 (unit.version, sources)
270 }
271
272 fn s(id: u64) -> SourceId {
274 SourceId::Table(TableId(id))
275 }
276
277 fn f(id: u64) -> FlowId {
279 FlowId(id)
280 }
281
282 fn v(ver: u64) -> CommitVersion {
284 CommitVersion(ver)
285 }
286
287 fn assert_normalized_eq(
289 result: &BTreeMap<FlowId, Vec<UnitOfWork>>,
290 expected: &BTreeMap<FlowId, Vec<UnitOfWork>>,
291 ) {
292 assert_eq!(result.len(), expected.len(), "Different number of flows");
293
294 for (flow_id, result_units) in result {
295 let expected_units =
296 expected.get(flow_id).expect(&format!("Flow {:?} missing in expected", flow_id));
297 assert_eq!(
298 result_units.len(),
299 expected_units.len(),
300 "Flow {:?} has different unit count",
301 flow_id
302 );
303
304 for (i, (result_unit, expected_unit)) in
305 result_units.iter().zip(expected_units.iter()).enumerate()
306 {
307 let result_snapshot = snapshot_unit(result_unit);
308 let expected_snapshot = snapshot_unit(expected_unit);
309 assert_eq!(
310 result_snapshot, expected_snapshot,
311 "Flow {:?} unit {} differs: {:?} vs {:?}",
312 flow_id, i, result_snapshot, expected_snapshot
313 );
314 }
315 }
316 }
317
318 #[tokio::test]
319 async fn test_empty_input() {
320 let engine = setup_test_engine(HashMap::new());
321 let input = BTreeMap::new();
322
323 let result = engine.create_partition(input).await;
324
325 assert!(result.is_empty(), "Empty input should produce empty output");
326 }
327
328 #[tokio::test]
329 async fn test_single_version_single_source_single_flow() {
330 let mut subscriptions = HashMap::new();
332 subscriptions.insert(s(1), vec![f(1)]);
333 let engine = setup_test_engine(subscriptions);
334
335 let mut input = BTreeMap::new();
337 input.insert(v(10), vec![(s(1), vec![mk_diff("d1"), mk_diff("d2")])]);
338
339 let result = engine.create_partition(input).await;
340 let normalized = normalize(result);
341
342 assert_eq!(normalized.len(), 1);
344 let f1_units = &normalized[&f(1)];
345 assert_eq!(f1_units.len(), 1);
346
347 let (ver, sources) = snapshot_unit(&f1_units[0]);
348 assert_eq!(ver, v(10));
349 assert_eq!(sources.get(&s(1)), Some(&2));
350 }
351
352 #[tokio::test]
353 async fn test_single_version_multi_flow_fanout() {
354 let mut subscriptions = HashMap::new();
356 subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
357 let engine = setup_test_engine(subscriptions);
358
359 let mut input = BTreeMap::new();
361 input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
362
363 let result = engine.create_partition(input).await;
364 let normalized = normalize(result);
365
366 assert_eq!(normalized.len(), 3);
368
369 for flow_id in [f(1), f(2), f(3)] {
370 let units = &normalized[&flow_id];
371 assert_eq!(units.len(), 1);
372 let (ver, sources) = snapshot_unit(&units[0]);
373 assert_eq!(ver, v(1));
374 assert_eq!(sources.get(&s(1)), Some(&1));
375 }
376 }
377
378 #[tokio::test]
379 async fn test_single_version_multi_source_partial_overlap() {
380 let mut subscriptions = HashMap::new();
382 subscriptions.insert(s(1), vec![f(1), f(2)]);
383 subscriptions.insert(s(2), vec![f(2), f(3)]);
384 let engine = setup_test_engine(subscriptions);
385
386 let mut input = BTreeMap::new();
388 input.insert(v(7), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
389
390 let result = engine.create_partition(input).await;
391 let normalized = normalize(result);
392
393 assert_eq!(normalized.len(), 3);
394
395 let f1_units = &normalized[&f(1)];
397 assert_eq!(f1_units.len(), 1);
398 let (ver, sources) = snapshot_unit(&f1_units[0]);
399 assert_eq!(ver, v(7));
400 assert_eq!(sources.len(), 1);
401 assert_eq!(sources.get(&s(1)), Some(&1));
402
403 let f2_units = &normalized[&f(2)];
405 assert_eq!(f2_units.len(), 1);
406 let (ver, sources) = snapshot_unit(&f2_units[0]);
407 assert_eq!(ver, v(7));
408 assert_eq!(sources.len(), 2);
409 assert_eq!(sources.get(&s(1)), Some(&1));
410 assert_eq!(sources.get(&s(2)), Some(&2));
411
412 let f3_units = &normalized[&f(3)];
414 assert_eq!(f3_units.len(), 1);
415 let (ver, sources) = snapshot_unit(&f3_units[0]);
416 assert_eq!(ver, v(7));
417 assert_eq!(sources.len(), 1);
418 assert_eq!(sources.get(&s(2)), Some(&2));
419 }
420
421 #[tokio::test]
422 async fn test_unknown_source_filtered() {
423 let mut subscriptions = HashMap::new();
425 subscriptions.insert(s(1), vec![f(1)]);
426 let engine = setup_test_engine(subscriptions);
427
428 let mut input = BTreeMap::new();
430 input.insert(v(3), vec![(s(999), vec![mk_diff("x")])]);
431
432 let result = engine.create_partition(input).await;
433
434 assert!(result.is_empty(), "Unknown sources should produce no units");
435 }
436
437 #[tokio::test]
438 async fn test_multi_version_ordering() {
439 let mut subscriptions = HashMap::new();
441 subscriptions.insert(s(1), vec![f(1)]);
442 subscriptions.insert(s(2), vec![f(2)]);
443 let engine = setup_test_engine(subscriptions);
444
445 let mut input = BTreeMap::new();
447 input.insert(v(20), vec![(s(1), vec![mk_diff("a")])]);
448 input.insert(v(10), vec![(s(1), vec![mk_diff("b")])]);
449 input.insert(v(30), vec![(s(2), vec![mk_diff("c")])]);
450
451 let result = engine.create_partition(input).await;
452 let normalized = normalize(result);
453
454 let f1_units = &normalized[&f(1)];
456 assert_eq!(f1_units.len(), 2);
457 assert_eq!(f1_units[0].version, v(10));
458 assert_eq!(f1_units[1].version, v(20));
459
460 let f2_units = &normalized[&f(2)];
462 assert_eq!(f2_units.len(), 1);
463 assert_eq!(f2_units[0].version, v(30));
464 }
465
466 #[tokio::test]
467 async fn test_version_gaps_preserved() {
468 let mut subscriptions = HashMap::new();
470 subscriptions.insert(s(1), vec![f(1)]);
471 let engine = setup_test_engine(subscriptions);
472
473 let mut input = BTreeMap::new();
475 input.insert(v(1), vec![(s(1), vec![mk_diff("a")])]);
476 input.insert(v(100), vec![(s(1), vec![mk_diff("b")])]);
477
478 let result = engine.create_partition(input).await;
479 let normalized = normalize(result);
480
481 let f1_units = &normalized[&f(1)];
483 assert_eq!(f1_units.len(), 2);
484 assert_eq!(f1_units[0].version, v(1));
485 assert_eq!(f1_units[1].version, v(100));
486 }
487
488 #[tokio::test]
489 async fn test_duplicate_source_entries_merged() {
490 let mut subscriptions = HashMap::new();
492 subscriptions.insert(s(1), vec![f(1)]);
493 let engine = setup_test_engine(subscriptions);
494
495 let mut input = BTreeMap::new();
497 input.insert(v(5), vec![(s(1), vec![mk_diff("x"), mk_diff("y")]), (s(1), vec![mk_diff("z")])]);
498
499 let result = engine.create_partition(input).await;
500 let normalized = normalize(result);
501
502 let f1_units = &normalized[&f(1)];
504 assert_eq!(f1_units.len(), 1);
505 let (ver, sources) = snapshot_unit(&f1_units[0]);
506 assert_eq!(ver, v(5));
507 assert_eq!(sources.get(&s(1)), Some(&3));
508 }
509
510 #[tokio::test]
511 async fn test_flow_with_multiple_sources_same_version() {
512 let mut subscriptions = HashMap::new();
514 subscriptions.insert(s(1), vec![f(1)]);
515 subscriptions.insert(s(2), vec![f(1)]);
516 let engine = setup_test_engine(subscriptions);
517
518 let mut input = BTreeMap::new();
520 input.insert(v(8), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
521
522 let result = engine.create_partition(input).await;
523 let normalized = normalize(result);
524
525 let f1_units = &normalized[&f(1)];
527 assert_eq!(f1_units.len(), 1);
528 let (ver, sources) = snapshot_unit(&f1_units[0]);
529 assert_eq!(ver, v(8));
530 assert_eq!(sources.len(), 2);
531 assert_eq!(sources.get(&s(1)), Some(&1));
532 assert_eq!(sources.get(&s(2)), Some(&2));
533 }
534
535 #[tokio::test]
536 async fn test_no_work_for_unaffected_flows() {
537 let mut subscriptions = HashMap::new();
539 subscriptions.insert(s(1), vec![f(1)]);
540 subscriptions.insert(s(2), vec![f(2)]);
541 let engine = setup_test_engine(subscriptions);
542
543 let mut input = BTreeMap::new();
545 input.insert(v(2), vec![(s(1), vec![mk_diff("a")])]);
546
547 let result = engine.create_partition(input).await;
548 let normalized = normalize(result);
549
550 assert_eq!(normalized.len(), 1);
552 assert!(normalized.contains_key(&f(1)));
553 assert!(!normalized.contains_key(&f(2)));
554 }
555
556 #[tokio::test]
557 async fn test_complex_multi_flow_multi_version() {
558 let mut subscriptions = HashMap::new();
560 subscriptions.insert(s(1), vec![f(1), f(2)]);
561 subscriptions.insert(s(2), vec![f(2)]);
562 let engine = setup_test_engine(subscriptions);
563
564 let mut input = BTreeMap::new();
567 input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
568 input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
569
570 let result = engine.create_partition(input).await;
571 let normalized = normalize(result);
572
573 let f1_units = &normalized[&f(1)];
575 assert_eq!(f1_units.len(), 2);
576
577 let (ver1, sources1) = snapshot_unit(&f1_units[0]);
578 assert_eq!(ver1, v(1));
579 assert_eq!(sources1.get(&s(1)), Some(&1));
580
581 let (ver2, sources2) = snapshot_unit(&f1_units[1]);
582 assert_eq!(ver2, v(2));
583 assert_eq!(sources2.get(&s(1)), Some(&1));
584
585 let f2_units = &normalized[&f(2)];
587 assert_eq!(f2_units.len(), 2);
588
589 let (ver1, sources1) = snapshot_unit(&f2_units[0]);
590 assert_eq!(ver1, v(1));
591 assert_eq!(sources1.get(&s(1)), Some(&1));
592 assert_eq!(sources1.get(&s(2)), Some(&1));
593
594 let (ver2, sources2) = snapshot_unit(&f2_units[1]);
595 assert_eq!(ver2, v(2));
596 assert_eq!(sources2.get(&s(1)), Some(&1));
597 }
598
599 #[tokio::test]
600 async fn test_large_diffs_zero_subscribers() {
601 let engine = setup_test_engine(HashMap::new());
602
603 let diffs: Vec<FlowDiff> = (0..1000).map(|i| mk_diff(&format!("d{}", i))).collect();
605 let mut input = BTreeMap::new();
606 input.insert(v(1), vec![(s(1), diffs)]);
607
608 let result = engine.create_partition(input).await;
609
610 assert!(result.is_empty(), "No subscribers means no units");
611 }
612
613 #[tokio::test]
614 async fn test_many_versions_sparse_changes() {
615 let mut subscriptions = HashMap::new();
617 subscriptions.insert(s(1), vec![f(1)]);
618 let engine = setup_test_engine(subscriptions);
619
620 let mut input = BTreeMap::new();
622 for i in 1..=100 {
623 if i == 10 || i == 50 || i == 90 {
624 input.insert(v(i), vec![(s(1), vec![mk_diff(&format!("d{}", i))])]);
625 } else {
626 input.insert(v(i), vec![(s(999), vec![mk_diff("x")])]);
628 }
629 }
630
631 let result = engine.create_partition(input).await;
632 let normalized = normalize(result);
633
634 let f1_units = &normalized[&f(1)];
636 assert_eq!(f1_units.len(), 3);
637 assert_eq!(f1_units[0].version, v(10));
638 assert_eq!(f1_units[1].version, v(50));
639 assert_eq!(f1_units[2].version, v(90));
640 }
641
642 #[tokio::test]
643 async fn test_many_sources_selective_subscription() {
644 let mut subscriptions = HashMap::new();
646 for i in 1..=50 {
647 if i % 10 == 5 {
648 subscriptions.insert(s(i), vec![f(1)]);
649 }
650 }
651 let engine = setup_test_engine(subscriptions);
652
653 let mut changes = vec![];
655 for i in 1..=50 {
656 changes.push((s(i), vec![mk_diff(&format!("d{}", i))]));
657 }
658 let mut input = BTreeMap::new();
659 input.insert(v(1), changes);
660
661 let result = engine.create_partition(input).await;
662 let normalized = normalize(result);
663
664 let f1_units = &normalized[&f(1)];
666 assert_eq!(f1_units.len(), 1);
667 let (_, sources) = snapshot_unit(&f1_units[0]);
668 assert_eq!(sources.len(), 5);
669 assert!(sources.contains_key(&s(5)));
670 assert!(sources.contains_key(&s(15)));
671 assert!(sources.contains_key(&s(25)));
672 assert!(sources.contains_key(&s(35)));
673 assert!(sources.contains_key(&s(45)));
674 }
675
676 #[tokio::test]
677 async fn test_input_permutation_invariance() {
678 let mut subscriptions = HashMap::new();
680 subscriptions.insert(s(1), vec![f(1), f(2)]);
681 let engine = setup_test_engine(subscriptions);
682
683 let test_versions =
685 vec![vec![v(10), v(20), v(30)], vec![v(30), v(10), v(20)], vec![v(20), v(30), v(10)]];
686
687 let mut results = vec![];
688 for versions in test_versions {
689 let mut input = BTreeMap::new();
690 for ver in versions {
691 input.insert(ver, vec![(s(1), vec![mk_diff(&format!("d{}", ver.0))])]);
692 }
693 let result = engine.create_partition(input).await;
694 results.push(normalize(result));
695 }
696
697 for i in 1..results.len() {
699 assert_normalized_eq(&results[0], &results[i]);
700 }
701
702 let f1_units = &results[0][&f(1)];
704 assert_eq!(f1_units.len(), 3);
705 assert_eq!(f1_units[0].version, v(10));
706 assert_eq!(f1_units[1].version, v(20));
707 assert_eq!(f1_units[2].version, v(30));
708 }
709
710 #[tokio::test]
711 async fn test_empty_diff_vec_handling() {
712 let mut subscriptions = HashMap::new();
714 subscriptions.insert(s(1), vec![f(1)]);
715 let engine = setup_test_engine(subscriptions);
716
717 let mut input = BTreeMap::new();
719 input.insert(v(1), vec![(s(1), vec![])]);
720
721 let result = engine.create_partition(input).await;
722 let normalized = normalize(result);
723
724 let f1_units = &normalized[&f(1)];
727 assert_eq!(f1_units.len(), 1);
728 let (ver, sources) = snapshot_unit(&f1_units[0]);
729 assert_eq!(ver, v(1));
730 assert_eq!(sources.get(&s(1)), Some(&0));
731 }
732
733 #[tokio::test]
734 async fn test_all_sources_unknown() {
735 let mut subscriptions = HashMap::new();
737 subscriptions.insert(s(1), vec![f(1)]);
738 let engine = setup_test_engine(subscriptions);
739
740 let mut input = BTreeMap::new();
742 input.insert(v(1), vec![(s(999), vec![mk_diff("x")])]);
743 input.insert(v(2), vec![(s(888), vec![mk_diff("y")])]);
744
745 let result = engine.create_partition(input).await;
746
747 assert!(result.is_empty(), "All unknown sources should produce empty output");
748 }
749
750 #[tokio::test]
751 async fn test_permutation_regression_fanout() {
752 let mut subscriptions = HashMap::new();
754 subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
755 let engine = setup_test_engine(subscriptions);
756
757 let mut input = BTreeMap::new();
759 input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
760
761 let expected = normalize(engine.create_partition(input.clone()).await);
762
763 for _ in 0..5 {
765 let mut entries: Vec<_> = input.clone().into_iter().collect();
766 entries.shuffle(&mut rand::rng());
767 let shuffled: BTreeMap<_, _> = entries.into_iter().collect();
768
769 let result = normalize(engine.create_partition(shuffled).await);
770 assert_normalized_eq(&result, &expected);
771 }
772 }
773
774 #[tokio::test]
775 async fn test_permutation_regression_complex() {
776 use rand::prelude::*;
777
778 let mut subscriptions = HashMap::new();
780 subscriptions.insert(s(1), vec![f(1), f(2)]);
781 subscriptions.insert(s(2), vec![f(2)]);
782 let engine = setup_test_engine(subscriptions);
783
784 let mut input = BTreeMap::new();
786 input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
787 input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
788
789 let expected = normalize(engine.create_partition(input.clone()).await);
790
791 for _ in 0..5 {
793 let mut entries: Vec<_> = input.clone().into_iter().collect();
794 entries.shuffle(&mut rand::rng());
795 let shuffled: BTreeMap<_, _> = entries.into_iter().collect();
796
797 let result = normalize(engine.create_partition(shuffled).await);
798 assert_normalized_eq(&result, &expected);
799 }
800 }
801
802 #[tokio::test]
803 async fn test_large_input_smoke() {
804 let mut subscriptions = HashMap::new();
806 for flow_i in 1..=20 {
807 for source_i in ((flow_i - 1) * 3 + 1)..=((flow_i - 1) * 3 + 3) {
809 let source_id = s(source_i % 20 + 1);
810 subscriptions.entry(source_id).or_insert_with(Vec::new).push(f(flow_i));
811 }
812 }
813 let engine = setup_test_engine(subscriptions);
814
815 let mut input = BTreeMap::new();
817 for ver_i in 1..=1000 {
818 let mut changes = vec![];
819 for source_i in 1..=5 {
820 changes.push((s((ver_i % 20) + source_i), vec![mk_diff(&format!("d{}", ver_i))]));
821 }
822 input.insert(v(ver_i), changes);
823 }
824
825 let result = engine.create_partition(input).await;
827 let normalized = normalize(result);
828
829 assert!(!normalized.is_empty());
831
832 for (_, units) in normalized {
834 for i in 1..units.len() {
835 assert!(units[i - 1].version < units[i].version, "Versions not sorted");
836 }
837 }
838 }
839
840 #[tokio::test]
841 async fn test_version_ordering_maintained_under_stress() {
842 let mut subscriptions = HashMap::new();
843 subscriptions.insert(s(1), vec![f(1)]);
844 let engine = setup_test_engine(subscriptions);
845
846 let mut rng = rng();
847 let mut versions: Vec<u64> = (1..=100).collect();
848 for _ in 0..10 {
849 versions.shuffle(&mut rng);
850 }
851
852 let mut input = BTreeMap::new();
853 for ver in versions {
854 input.insert(v(ver), vec![(s(1), vec![mk_diff(&format!("d{}", ver))])]);
855 }
856
857 let result = engine.create_partition(input).await;
858 let normalized = normalize(result);
859
860 let f1_units = &normalized[&f(1)];
861 assert_eq!(f1_units.len(), 100);
862
863 for i in 0..100 {
864 assert_eq!(f1_units[i].version, v((i + 1) as u64));
865 }
866 }
867
868 #[tokio::test]
869 async fn test_version_filtering() {
870 let mut subscriptions = HashMap::new();
871 subscriptions.insert(s(1), vec![f(1)]);
872 let engine = setup_test_engine(subscriptions);
873
874 engine.inner.flow_creation_versions.write().await.insert(f(1), v(50));
875
876 let mut input = BTreeMap::new();
877 input.insert(v(40), vec![(s(1), vec![mk_diff("d40")])]);
878 input.insert(v(50), vec![(s(1), vec![mk_diff("d50")])]);
879 input.insert(v(51), vec![(s(1), vec![mk_diff("d51")])]);
880 input.insert(v(60), vec![(s(1), vec![mk_diff("d60")])]);
881 input.insert(v(70), vec![(s(1), vec![mk_diff("d70")])]);
882
883 let result = engine.create_partition(input).await;
884 let normalized = normalize(result);
885
886 let f1_units = &normalized[&f(1)];
887 assert_eq!(f1_units.len(), 4);
888 assert_eq!(f1_units[0].version, v(50));
889 assert_eq!(f1_units[1].version, v(51));
890 assert_eq!(f1_units[2].version, v(60));
891 assert_eq!(f1_units[3].version, v(70));
892 }
893
894 #[tokio::test]
895 async fn test_backfill_version_per_flow_isolation() {
896 let mut subscriptions = HashMap::new();
897 subscriptions.insert(s(1), vec![f(1), f(2)]);
898 let engine = setup_test_engine(subscriptions);
899
900 engine.inner.flow_creation_versions.write().await.insert(f(1), v(30));
901 engine.inner.flow_creation_versions.write().await.insert(f(2), v(50));
902
903 let mut input = BTreeMap::new();
904 input.insert(v(20), vec![(s(1), vec![mk_diff("d20")])]);
905 input.insert(v(40), vec![(s(1), vec![mk_diff("d40")])]);
906 input.insert(v(60), vec![(s(1), vec![mk_diff("d60")])]);
907
908 let result = engine.create_partition(input).await;
909 let normalized = normalize(result);
910
911 let f1_units = &normalized[&f(1)];
912 assert_eq!(f1_units.len(), 2);
913 assert_eq!(f1_units[0].version, v(40));
914 assert_eq!(f1_units[1].version, v(60));
915
916 let f2_units = &normalized[&f(2)];
917 assert_eq!(f2_units.len(), 1);
918 assert_eq!(f2_units[0].version, v(60));
919 }
920
921 #[tokio::test]
922 async fn test_no_backfill_version_processes_all() {
923 let mut subscriptions = HashMap::new();
924 subscriptions.insert(s(1), vec![f(1)]);
925 let engine = setup_test_engine(subscriptions);
926
927 let mut input = BTreeMap::new();
928 input.insert(v(10), vec![(s(1), vec![mk_diff("d10")])]);
929 input.insert(v(20), vec![(s(1), vec![mk_diff("d20")])]);
930 input.insert(v(30), vec![(s(1), vec![mk_diff("d30")])]);
931
932 let result = engine.create_partition(input).await;
933 let normalized = normalize(result);
934
935 let f1_units = &normalized[&f(1)];
936 assert_eq!(f1_units.len(), 3);
937 }
938
939 #[tokio::test]
940 async fn test_backfill_version_exact_boundary() {
941 let mut subscriptions = HashMap::new();
942 subscriptions.insert(s(1), vec![f(1)]);
943 let engine = setup_test_engine(subscriptions);
944
945 engine.inner.flow_creation_versions.write().await.insert(f(1), v(100));
946
947 let mut input = BTreeMap::new();
948 input.insert(v(99), vec![(s(1), vec![mk_diff("d99")])]);
949 input.insert(v(100), vec![(s(1), vec![mk_diff("d100")])]);
950 input.insert(v(101), vec![(s(1), vec![mk_diff("d101")])]);
951
952 let result = engine.create_partition(input).await;
953 let normalized = normalize(result);
954
955 let f1_units = &normalized[&f(1)];
956 assert_eq!(f1_units.len(), 2);
957 assert_eq!(f1_units[0].version, v(100));
958 assert_eq!(f1_units[1].version, v(101));
959 }
960}