reifydb_sub_flow/engine/
partition.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::collections::HashMap;
5
6use reifydb_core::{
7	CommitVersion,
8	interface::{FlowId, SourceId},
9};
10
11use crate::{
12	flow::{FlowChange, FlowDiff},
13	worker::{UnitOfWork, UnitsOfWork},
14};
15
16impl crate::engine::FlowEngine {
17	/// Partition changes from multiple versions into units of work grouped by flow
18	///
19	/// This method handles the complete partitioning logic:
20	/// 1. Processes each version's changes separately
21	/// 2. Groups units by flow across all versions
22	/// 3. Maintains version ordering within each flow
23	///
24	/// # Arguments
25	/// * `changes_by_version` - Map of version to (source_id, changes) pairs
26	///
27	/// # Returns
28	/// UnitsOfWork where each flow has its units ordered by version
29	pub fn create_partition(
30		&self,
31		changes_by_version: HashMap<CommitVersion, Vec<(SourceId, Vec<FlowDiff>)>>,
32	) -> UnitsOfWork {
33		let mut all_units_by_flow: HashMap<FlowId, Vec<UnitOfWork>> = HashMap::new();
34
35		// Sort versions to maintain ordering
36		let mut versions: Vec<_> = changes_by_version.keys().copied().collect();
37		versions.sort();
38
39		for version in versions {
40			let changes = changes_by_version.get(&version).unwrap();
41
42			// Group changes by source for this version
43			let mut changes_by_source: HashMap<SourceId, Vec<FlowDiff>> = HashMap::new();
44			for (source_id, diffs) in changes {
45				changes_by_source.entry(*source_id).or_insert_with(Vec::new).extend(diffs.clone());
46			}
47
48			// Partition this version's changes into units of work
49			let version_units = self.partition_into_units_of_work(changes_by_source, version);
50
51			// Merge units from this version into the overall collection
52			// Each flow's units are stored in a Vec to maintain version ordering
53			for flow_units in version_units.into_inner() {
54				for unit in flow_units {
55					all_units_by_flow.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
56				}
57			}
58		}
59
60		// Convert the HashMap to UnitsOfWork for the worker
61		let units_vec: Vec<Vec<UnitOfWork>> = all_units_by_flow.into_iter().map(|(_, units)| units).collect();
62
63		// INVARIANT: Validate that each flow_id appears exactly once in the output
64		// and that each inner Vec contains units for only one flow
65		{
66			use std::collections::HashSet;
67			let mut seen_flows = HashSet::new();
68
69			for flow_units in &units_vec {
70				assert!(!flow_units.is_empty(), "INVARIANT VIOLATED: Empty flow units in UnitsOfWork");
71
72				let flow_id = flow_units[0].flow_id;
73				assert!(
74					!seen_flows.contains(&flow_id),
75					"INVARIANT VIOLATED: flow_id {:?} appears multiple times in UnitsOfWork. \
76					This means the same flow will be processed by multiple parallel tasks, \
77					causing keyspace overlap.",
78					flow_id
79				);
80
81				// Validate all units in this Vec are for the same flow
82				for unit in flow_units {
83					assert_eq!(
84						unit.flow_id, flow_id,
85						"INVARIANT VIOLATED: Mixed flow_ids in same Vec - expected {:?}, got {:?}. \
86						All units in a Vec must belong to the same flow.",
87						flow_id, unit.flow_id
88					);
89				}
90
91				seen_flows.insert(flow_id);
92			}
93		}
94
95		UnitsOfWork::new(units_vec)
96	}
97
98	fn partition_into_units_of_work(
99		&self,
100		changes_by_source: HashMap<SourceId, Vec<FlowDiff>>,
101		version: CommitVersion,
102	) -> UnitsOfWork {
103		// Map to collect all source changes per flow
104		let mut flow_changes: HashMap<FlowId, Vec<FlowChange>> = HashMap::new();
105
106		// Read source subscriptions
107		let sources = self.inner.sources.read();
108
109		// For each source that changed
110		for (source_id, diffs) in changes_by_source {
111			// Find all flows subscribed to this source
112			if let Some(subscriptions) = sources.get(&source_id) {
113				for (flow_id, node_id) in subscriptions {
114					// Create FlowChange scoped to the specific node in this flow
115					// This ensures each flow only processes its own nodes, preventing keyspace
116					// overlap
117					let change = FlowChange::internal(*node_id, version, diffs.clone());
118					flow_changes.entry(*flow_id).or_insert_with(Vec::new).push(change);
119				}
120			}
121		}
122
123		drop(sources);
124
125		// Group all units at this version into a single UnitsOfWork
126		// Since all units are at the same version, each flow gets exactly one unit
127		let units: Vec<Vec<UnitOfWork>> = flow_changes
128			.into_iter()
129			.map(|(flow_id, source_changes)| vec![UnitOfWork::new(flow_id, version, source_changes)])
130			.collect();
131
132		UnitsOfWork::new(units)
133	}
134}
135
136#[cfg(test)]
137mod tests {
138	use std::{
139		collections::{BTreeMap, HashMap},
140		sync::Arc,
141	};
142
143	use parking_lot::RwLock;
144	use rand::{rng, seq::SliceRandom};
145	use reifydb_core::{
146		CommitVersion, Row,
147		event::EventBus,
148		interface::{FlowId, FlowNodeId, SourceId, TableId},
149		util::CowVec,
150		value::encoded::{EncodedValues, EncodedValuesNamedLayout},
151	};
152	use reifydb_engine::{StandardRowEvaluator, execute::Executor};
153	use reifydb_rql::flow::FlowGraphAnalyzer;
154	use reifydb_type::{RowNumber, Type};
155
156	use crate::{
157		engine::{FlowEngine, FlowEngineInner},
158		flow::FlowDiff,
159		operator::transform::registry::TransformOperatorRegistry,
160		worker::{UnitOfWork, UnitsOfWork},
161	};
162
163	/// Helper to build sources map with explicit node IDs
164	/// Maps source_id to list of (flow_id, node_id) pairs
165	/// The node_id encodes BOTH the source and flow: source_id*1000 + flow_id
166	/// This ensures unique node IDs across all source subscriptions
167	fn mk_sources(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> HashMap<SourceId, Vec<(FlowId, FlowNodeId)>> {
168		let mut sources_map = HashMap::new();
169		for (source_id, flows) in subscriptions {
170			let source_num = match source_id {
171				SourceId::Table(tid) => tid.0,
172				_ => panic!("Only Table sources supported in tests"),
173			};
174			let subscriptions_with_nodes: Vec<(FlowId, FlowNodeId)> = flows
175				.into_iter()
176				.map(|flow_id| {
177					// Node ID = source * 1000 + flow, ensuring global uniqueness
178					let node_id = FlowNodeId(source_num * 1000 + flow_id.0);
179					(flow_id, node_id)
180				})
181				.collect();
182			sources_map.insert(source_id, subscriptions_with_nodes);
183		}
184		sources_map
185	}
186
187	fn setup_test_engine(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> FlowEngine {
188		let evaluator = StandardRowEvaluator::default();
189		let executor = Executor::testing();
190		let registry = TransformOperatorRegistry::new();
191
192		let sources = mk_sources(subscriptions);
193
194		let inner = FlowEngineInner {
195			evaluator,
196			executor,
197			registry,
198			operators: RwLock::new(HashMap::new()),
199			flows: RwLock::new(HashMap::new()),
200			sources: RwLock::new(sources),
201			sinks: RwLock::new(HashMap::new()),
202			analyzer: RwLock::new(FlowGraphAnalyzer::new()),
203			event_bus: EventBus::new(),
204		};
205
206		FlowEngine {
207			inner: Arc::new(inner),
208		}
209	}
210
211	/// Create a test FlowDiff with identifiable data
212	fn mk_diff(label: &str) -> FlowDiff {
213		let row = mk_row(label);
214		FlowDiff::Insert {
215			post: row,
216		}
217	}
218
219	/// Create a test Row with identifiable data
220	fn mk_row(label: &str) -> Row {
221		Row {
222			number: RowNumber(label.len() as u64),
223			encoded: EncodedValues(CowVec::new(label.as_bytes().to_vec())),
224			layout: EncodedValuesNamedLayout::new(std::iter::once(("test".to_string(), Type::Uint8))),
225		}
226	}
227
228	/// Normalize UnitsOfWork into a sorted map for comparison
229	fn normalize(units: UnitsOfWork) -> BTreeMap<FlowId, Vec<UnitOfWork>> {
230		let mut map = BTreeMap::new();
231		for flow_units in units.into_inner() {
232			for unit in flow_units {
233				map.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
234			}
235		}
236		// Sort each flow's units by version
237		for vec in map.values_mut() {
238			vec.sort_by_key(|u| u.version);
239		}
240		map
241	}
242
243	/// Extract a snapshot of a unit: (version, source_id -> diff_count)
244	/// Note: After the fix, FlowChanges use Internal origin with node_id
245	/// In tests, node_id = source_id * 1000 + flow_id, so we reverse-engineer source_id
246	fn snapshot_unit(unit: &UnitOfWork) -> (CommitVersion, BTreeMap<SourceId, usize>) {
247		let mut sources = BTreeMap::new();
248
249		for change in &unit.source_changes {
250			match change.origin {
251				crate::flow::FlowChangeOrigin::External(source_id) => {
252					let count = change.diffs.len();
253					*sources.entry(source_id).or_insert(0) += count;
254				}
255				crate::flow::FlowChangeOrigin::Internal(node_id) => {
256					// In test setup, node_id = source_id * 1000 + flow_id
257					// So source_id = node_id / 1000
258					let source_num = node_id.0 / 1000;
259					let source_id = SourceId::Table(reifydb_core::interface::TableId(source_num));
260					let count = change.diffs.len();
261					*sources.entry(source_id).or_insert(0) += count;
262				}
263			}
264		}
265		(unit.version, sources)
266	}
267
268	/// Helper to create source IDs
269	fn s(id: u64) -> SourceId {
270		SourceId::Table(TableId(id))
271	}
272
273	/// Helper to create flow IDs
274	fn f(id: u64) -> FlowId {
275		FlowId(id)
276	}
277
278	/// Helper to create commit versions
279	fn v(ver: u64) -> CommitVersion {
280		CommitVersion(ver)
281	}
282
283	/// Compare two normalized results by their snapshots
284	fn assert_normalized_eq(
285		result: &BTreeMap<FlowId, Vec<UnitOfWork>>,
286		expected: &BTreeMap<FlowId, Vec<UnitOfWork>>,
287	) {
288		assert_eq!(result.len(), expected.len(), "Different number of flows");
289
290		for (flow_id, result_units) in result {
291			let expected_units =
292				expected.get(flow_id).expect(&format!("Flow {:?} missing in expected", flow_id));
293			assert_eq!(
294				result_units.len(),
295				expected_units.len(),
296				"Flow {:?} has different unit count",
297				flow_id
298			);
299
300			for (i, (result_unit, expected_unit)) in
301				result_units.iter().zip(expected_units.iter()).enumerate()
302			{
303				let result_snapshot = snapshot_unit(result_unit);
304				let expected_snapshot = snapshot_unit(expected_unit);
305				assert_eq!(
306					result_snapshot, expected_snapshot,
307					"Flow {:?} unit {} differs: {:?} vs {:?}",
308					flow_id, i, result_snapshot, expected_snapshot
309				);
310			}
311		}
312	}
313
314	#[test]
315	fn test_empty_input() {
316		let engine = setup_test_engine(HashMap::new());
317		let input = HashMap::new();
318
319		let result = engine.create_partition(input);
320
321		assert!(result.is_empty(), "Empty input should produce empty output");
322	}
323
324	#[test]
325	fn test_single_version_single_source_single_flow() {
326		// S1 -> F1
327		let mut subscriptions = HashMap::new();
328		subscriptions.insert(s(1), vec![f(1)]);
329		let engine = setup_test_engine(subscriptions);
330
331		// V10: S1[d1, d2]
332		let mut input = HashMap::new();
333		input.insert(v(10), vec![(s(1), vec![mk_diff("d1"), mk_diff("d2")])]);
334
335		let result = engine.create_partition(input);
336		let normalized = normalize(result);
337
338		// Expect F1 has 1 unit at V10 with S1:2 diffs
339		assert_eq!(normalized.len(), 1);
340		let f1_units = &normalized[&f(1)];
341		assert_eq!(f1_units.len(), 1);
342
343		let (ver, sources) = snapshot_unit(&f1_units[0]);
344		assert_eq!(ver, v(10));
345		assert_eq!(sources.get(&s(1)), Some(&2));
346	}
347
348	#[test]
349	fn test_single_version_multi_flow_fanout() {
350		// S1 -> [F1, F2, F3]
351		let mut subscriptions = HashMap::new();
352		subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
353		let engine = setup_test_engine(subscriptions);
354
355		// V1: S1[d1]
356		let mut input = HashMap::new();
357		input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
358
359		let result = engine.create_partition(input);
360		let normalized = normalize(result);
361
362		// Expect F1, F2, F3 each have 1 unit at V1 with S1:1
363		assert_eq!(normalized.len(), 3);
364
365		for flow_id in [f(1), f(2), f(3)] {
366			let units = &normalized[&flow_id];
367			assert_eq!(units.len(), 1);
368			let (ver, sources) = snapshot_unit(&units[0]);
369			assert_eq!(ver, v(1));
370			assert_eq!(sources.get(&s(1)), Some(&1));
371		}
372	}
373
374	#[test]
375	fn test_single_version_multi_source_partial_overlap() {
376		// S1 -> [F1, F2], S2 -> [F2, F3]
377		let mut subscriptions = HashMap::new();
378		subscriptions.insert(s(1), vec![f(1), f(2)]);
379		subscriptions.insert(s(2), vec![f(2), f(3)]);
380		let engine = setup_test_engine(subscriptions);
381
382		// V7: S1[a], S2[b,c]
383		let mut input = HashMap::new();
384		input.insert(v(7), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
385
386		let result = engine.create_partition(input);
387		let normalized = normalize(result);
388
389		assert_eq!(normalized.len(), 3);
390
391		// F1 @V7: S1:1
392		let f1_units = &normalized[&f(1)];
393		assert_eq!(f1_units.len(), 1);
394		let (ver, sources) = snapshot_unit(&f1_units[0]);
395		assert_eq!(ver, v(7));
396		assert_eq!(sources.len(), 1);
397		assert_eq!(sources.get(&s(1)), Some(&1));
398
399		// F2 @V7: S1:1, S2:2
400		let f2_units = &normalized[&f(2)];
401		assert_eq!(f2_units.len(), 1);
402		let (ver, sources) = snapshot_unit(&f2_units[0]);
403		assert_eq!(ver, v(7));
404		assert_eq!(sources.len(), 2);
405		assert_eq!(sources.get(&s(1)), Some(&1));
406		assert_eq!(sources.get(&s(2)), Some(&2));
407
408		// F3 @V7: S2:2
409		let f3_units = &normalized[&f(3)];
410		assert_eq!(f3_units.len(), 1);
411		let (ver, sources) = snapshot_unit(&f3_units[0]);
412		assert_eq!(ver, v(7));
413		assert_eq!(sources.len(), 1);
414		assert_eq!(sources.get(&s(2)), Some(&2));
415	}
416
417	#[test]
418	fn test_unknown_source_filtered() {
419		// S1 -> [F1]
420		let mut subscriptions = HashMap::new();
421		subscriptions.insert(s(1), vec![f(1)]);
422		let engine = setup_test_engine(subscriptions);
423
424		// V3: S999[x] (unknown source)
425		let mut input = HashMap::new();
426		input.insert(v(3), vec![(s(999), vec![mk_diff("x")])]);
427
428		let result = engine.create_partition(input);
429
430		assert!(result.is_empty(), "Unknown sources should produce no units");
431	}
432
433	#[test]
434	fn test_multi_version_ordering() {
435		// S1 -> [F1], S2 -> [F2]
436		let mut subscriptions = HashMap::new();
437		subscriptions.insert(s(1), vec![f(1)]);
438		subscriptions.insert(s(2), vec![f(2)]);
439		let engine = setup_test_engine(subscriptions);
440
441		// Input versions intentionally unsorted: V20, V10, V30
442		let mut input = HashMap::new();
443		input.insert(v(20), vec![(s(1), vec![mk_diff("a")])]);
444		input.insert(v(10), vec![(s(1), vec![mk_diff("b")])]);
445		input.insert(v(30), vec![(s(2), vec![mk_diff("c")])]);
446
447		let result = engine.create_partition(input);
448		let normalized = normalize(result);
449
450		// F1: units at V10 then V20 (ascending)
451		let f1_units = &normalized[&f(1)];
452		assert_eq!(f1_units.len(), 2);
453		assert_eq!(f1_units[0].version, v(10));
454		assert_eq!(f1_units[1].version, v(20));
455
456		// F2: single unit at V30
457		let f2_units = &normalized[&f(2)];
458		assert_eq!(f2_units.len(), 1);
459		assert_eq!(f2_units[0].version, v(30));
460	}
461
462	#[test]
463	fn test_version_gaps_preserved() {
464		// S1 -> [F1]
465		let mut subscriptions = HashMap::new();
466		subscriptions.insert(s(1), vec![f(1)]);
467		let engine = setup_test_engine(subscriptions);
468
469		// V1, V100 (non-contiguous)
470		let mut input = HashMap::new();
471		input.insert(v(1), vec![(s(1), vec![mk_diff("a")])]);
472		input.insert(v(100), vec![(s(1), vec![mk_diff("b")])]);
473
474		let result = engine.create_partition(input);
475		let normalized = normalize(result);
476
477		// F1 has exactly 2 units at V1 and V100
478		let f1_units = &normalized[&f(1)];
479		assert_eq!(f1_units.len(), 2);
480		assert_eq!(f1_units[0].version, v(1));
481		assert_eq!(f1_units[1].version, v(100));
482	}
483
484	#[test]
485	fn test_duplicate_source_entries_merged() {
486		// S1 -> [F1]
487		let mut subscriptions = HashMap::new();
488		subscriptions.insert(s(1), vec![f(1)]);
489		let engine = setup_test_engine(subscriptions);
490
491		// V5: S1[x,y], S1[z] (duplicate source entries)
492		let mut input = HashMap::new();
493		input.insert(v(5), vec![(s(1), vec![mk_diff("x"), mk_diff("y")]), (s(1), vec![mk_diff("z")])]);
494
495		let result = engine.create_partition(input);
496		let normalized = normalize(result);
497
498		// F1 @V5 should have S1:3 diffs (merged)
499		let f1_units = &normalized[&f(1)];
500		assert_eq!(f1_units.len(), 1);
501		let (ver, sources) = snapshot_unit(&f1_units[0]);
502		assert_eq!(ver, v(5));
503		assert_eq!(sources.get(&s(1)), Some(&3));
504	}
505
506	#[test]
507	fn test_flow_with_multiple_sources_same_version() {
508		// S1 -> [F1], S2 -> [F1]
509		let mut subscriptions = HashMap::new();
510		subscriptions.insert(s(1), vec![f(1)]);
511		subscriptions.insert(s(2), vec![f(1)]);
512		let engine = setup_test_engine(subscriptions);
513
514		// V8: S1[a], S2[b,c]
515		let mut input = HashMap::new();
516		input.insert(v(8), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
517
518		let result = engine.create_partition(input);
519		let normalized = normalize(result);
520
521		// F1 @V8 has two sources: S1:1, S2:2
522		let f1_units = &normalized[&f(1)];
523		assert_eq!(f1_units.len(), 1);
524		let (ver, sources) = snapshot_unit(&f1_units[0]);
525		assert_eq!(ver, v(8));
526		assert_eq!(sources.len(), 2);
527		assert_eq!(sources.get(&s(1)), Some(&1));
528		assert_eq!(sources.get(&s(2)), Some(&2));
529	}
530
531	#[test]
532	fn test_no_work_for_unaffected_flows() {
533		// S1 -> [F1], S2 -> [F2]
534		let mut subscriptions = HashMap::new();
535		subscriptions.insert(s(1), vec![f(1)]);
536		subscriptions.insert(s(2), vec![f(2)]);
537		let engine = setup_test_engine(subscriptions);
538
539		// V2: S1[a] only
540		let mut input = HashMap::new();
541		input.insert(v(2), vec![(s(1), vec![mk_diff("a")])]);
542
543		let result = engine.create_partition(input);
544		let normalized = normalize(result);
545
546		// Only F1 should have work
547		assert_eq!(normalized.len(), 1);
548		assert!(normalized.contains_key(&f(1)));
549		assert!(!normalized.contains_key(&f(2)));
550	}
551
552	#[test]
553	fn test_complex_multi_flow_multi_version() {
554		// S1 -> [F1,F2], S2 -> [F2]
555		let mut subscriptions = HashMap::new();
556		subscriptions.insert(s(1), vec![f(1), f(2)]);
557		subscriptions.insert(s(2), vec![f(2)]);
558		let engine = setup_test_engine(subscriptions);
559
560		// V1: S1[a1], S2[b1]
561		// V2: S1[a2]
562		let mut input = HashMap::new();
563		input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
564		input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
565
566		let result = engine.create_partition(input);
567		let normalized = normalize(result);
568
569		// F1: V1 {S1:1}, V2 {S1:1}
570		let f1_units = &normalized[&f(1)];
571		assert_eq!(f1_units.len(), 2);
572
573		let (ver1, sources1) = snapshot_unit(&f1_units[0]);
574		assert_eq!(ver1, v(1));
575		assert_eq!(sources1.get(&s(1)), Some(&1));
576
577		let (ver2, sources2) = snapshot_unit(&f1_units[1]);
578		assert_eq!(ver2, v(2));
579		assert_eq!(sources2.get(&s(1)), Some(&1));
580
581		// F2: V1 {S1:1, S2:1}, V2 {S1:1}
582		let f2_units = &normalized[&f(2)];
583		assert_eq!(f2_units.len(), 2);
584
585		let (ver1, sources1) = snapshot_unit(&f2_units[0]);
586		assert_eq!(ver1, v(1));
587		assert_eq!(sources1.get(&s(1)), Some(&1));
588		assert_eq!(sources1.get(&s(2)), Some(&1));
589
590		let (ver2, sources2) = snapshot_unit(&f2_units[1]);
591		assert_eq!(ver2, v(2));
592		assert_eq!(sources2.get(&s(1)), Some(&1));
593	}
594
595	#[test]
596	fn test_large_diffs_zero_subscribers() {
597		let engine = setup_test_engine(HashMap::new());
598
599		// Many diffs but no subscribers
600		let diffs: Vec<FlowDiff> = (0..1000).map(|i| mk_diff(&format!("d{}", i))).collect();
601		let mut input = HashMap::new();
602		input.insert(v(1), vec![(s(1), diffs)]);
603
604		let result = engine.create_partition(input);
605
606		assert!(result.is_empty(), "No subscribers means no units");
607	}
608
609	#[test]
610	fn test_many_versions_sparse_changes() {
611		// S1 -> [F1]
612		let mut subscriptions = HashMap::new();
613		subscriptions.insert(s(1), vec![f(1)]);
614		let engine = setup_test_engine(subscriptions);
615
616		// 100 versions, but flow only affected by versions 10, 50, 90
617		let mut input = HashMap::new();
618		for i in 1..=100 {
619			if i == 10 || i == 50 || i == 90 {
620				input.insert(v(i), vec![(s(1), vec![mk_diff(&format!("d{}", i))])]);
621			} else {
622				// Other sources not subscribed by F1
623				input.insert(v(i), vec![(s(999), vec![mk_diff("x")])]);
624			}
625		}
626
627		let result = engine.create_partition(input);
628		let normalized = normalize(result);
629
630		// F1 should only have 3 units
631		let f1_units = &normalized[&f(1)];
632		assert_eq!(f1_units.len(), 3);
633		assert_eq!(f1_units[0].version, v(10));
634		assert_eq!(f1_units[1].version, v(50));
635		assert_eq!(f1_units[2].version, v(90));
636	}
637
638	#[test]
639	fn test_many_sources_selective_subscription() {
640		// F1 subscribes to only S5, S15, S25, S35, S45 out of 50 sources
641		let mut subscriptions = HashMap::new();
642		for i in 1..=50 {
643			if i % 10 == 5 {
644				subscriptions.insert(s(i), vec![f(1)]);
645			}
646		}
647		let engine = setup_test_engine(subscriptions);
648
649		// All 50 sources change
650		let mut changes = vec![];
651		for i in 1..=50 {
652			changes.push((s(i), vec![mk_diff(&format!("d{}", i))]));
653		}
654		let mut input = HashMap::new();
655		input.insert(v(1), changes);
656
657		let result = engine.create_partition(input);
658		let normalized = normalize(result);
659
660		// F1 should have 1 unit with exactly 5 sources
661		let f1_units = &normalized[&f(1)];
662		assert_eq!(f1_units.len(), 1);
663		let (_, sources) = snapshot_unit(&f1_units[0]);
664		assert_eq!(sources.len(), 5);
665		assert!(sources.contains_key(&s(5)));
666		assert!(sources.contains_key(&s(15)));
667		assert!(sources.contains_key(&s(25)));
668		assert!(sources.contains_key(&s(35)));
669		assert!(sources.contains_key(&s(45)));
670	}
671
672	#[test]
673	fn test_input_permutation_invariance() {
674		// S1 -> [F1, F2]
675		let mut subscriptions = HashMap::new();
676		subscriptions.insert(s(1), vec![f(1), f(2)]);
677		let engine = setup_test_engine(subscriptions);
678
679		// Create input with versions in different orders
680		let test_versions =
681			vec![vec![v(10), v(20), v(30)], vec![v(30), v(10), v(20)], vec![v(20), v(30), v(10)]];
682
683		let mut results = vec![];
684		for versions in test_versions {
685			let mut input = HashMap::new();
686			for ver in versions {
687				input.insert(ver, vec![(s(1), vec![mk_diff(&format!("d{}", ver.0))])]);
688			}
689			let result = engine.create_partition(input);
690			results.push(normalize(result));
691		}
692
693		// All permutations should produce the same normalized output
694		for i in 1..results.len() {
695			assert_normalized_eq(&results[0], &results[i]);
696		}
697
698		// Verify correct ordering
699		let f1_units = &results[0][&f(1)];
700		assert_eq!(f1_units.len(), 3);
701		assert_eq!(f1_units[0].version, v(10));
702		assert_eq!(f1_units[1].version, v(20));
703		assert_eq!(f1_units[2].version, v(30));
704	}
705
706	#[test]
707	fn test_empty_diff_vec_handling() {
708		// S1 -> [F1]
709		let mut subscriptions = HashMap::new();
710		subscriptions.insert(s(1), vec![f(1)]);
711		let engine = setup_test_engine(subscriptions);
712
713		// V1: S1 with empty diff vec
714		let mut input = HashMap::new();
715		input.insert(v(1), vec![(s(1), vec![])]);
716
717		let result = engine.create_partition(input);
718		let normalized = normalize(result);
719
720		// Current behavior: empty diffs still create a unit with 0 count
721		// This documents the actual behavior
722		let f1_units = &normalized[&f(1)];
723		assert_eq!(f1_units.len(), 1);
724		let (ver, sources) = snapshot_unit(&f1_units[0]);
725		assert_eq!(ver, v(1));
726		assert_eq!(sources.get(&s(1)), Some(&0));
727	}
728
729	#[test]
730	fn test_all_sources_unknown() {
731		// S1 -> [F1]
732		let mut subscriptions = HashMap::new();
733		subscriptions.insert(s(1), vec![f(1)]);
734		let engine = setup_test_engine(subscriptions);
735
736		// All input sources are unknown
737		let mut input = HashMap::new();
738		input.insert(v(1), vec![(s(999), vec![mk_diff("x")])]);
739		input.insert(v(2), vec![(s(888), vec![mk_diff("y")])]);
740
741		let result = engine.create_partition(input);
742
743		assert!(result.is_empty(), "All unknown sources should produce empty output");
744	}
745
746	#[test]
747	fn test_permutation_regression_fanout() {
748		// S1 -> [F1, F2, F3]
749		let mut subscriptions = HashMap::new();
750		subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
751		let engine = setup_test_engine(subscriptions);
752
753		// Original input
754		let mut input = HashMap::new();
755		input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
756
757		let expected = normalize(engine.create_partition(input.clone()));
758
759		// Test 5 random permutations
760		for _ in 0..5 {
761			let mut entries: Vec<_> = input.clone().into_iter().collect();
762			entries.shuffle(&mut rand::rng());
763			let shuffled: HashMap<_, _> = entries.into_iter().collect();
764
765			let result = normalize(engine.create_partition(shuffled));
766			assert_normalized_eq(&result, &expected);
767		}
768	}
769
770	#[test]
771	fn test_permutation_regression_complex() {
772		use rand::prelude::*;
773
774		// S1 -> [F1,F2], S2 -> [F2]
775		let mut subscriptions = HashMap::new();
776		subscriptions.insert(s(1), vec![f(1), f(2)]);
777		subscriptions.insert(s(2), vec![f(2)]);
778		let engine = setup_test_engine(subscriptions);
779
780		// Original input
781		let mut input = HashMap::new();
782		input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
783		input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
784
785		let expected = normalize(engine.create_partition(input.clone()));
786
787		// Test 5 random permutations
788		for _ in 0..5 {
789			let mut entries: Vec<_> = input.clone().into_iter().collect();
790			entries.shuffle(&mut rand::rng());
791			let shuffled: HashMap<_, _> = entries.into_iter().collect();
792
793			let result = normalize(engine.create_partition(shuffled));
794			assert_normalized_eq(&result, &expected);
795		}
796	}
797
798	#[test]
799	fn test_large_input_smoke() {
800		// 20 sources, 20 flows, sparse subscriptions
801		let mut subscriptions = HashMap::new();
802		for flow_i in 1..=20 {
803			// Each flow subscribes to 3 sources
804			for source_i in ((flow_i - 1) * 3 + 1)..=((flow_i - 1) * 3 + 3) {
805				let source_id = s(source_i % 20 + 1);
806				subscriptions.entry(source_id).or_insert_with(Vec::new).push(f(flow_i));
807			}
808		}
809		let engine = setup_test_engine(subscriptions);
810
811		// 1000 versions, each with 5 random sources changing
812		let mut input = HashMap::new();
813		for ver_i in 1..=1000 {
814			let mut changes = vec![];
815			for source_i in 1..=5 {
816				changes.push((s((ver_i % 20) + source_i), vec![mk_diff(&format!("d{}", ver_i))]));
817			}
818			input.insert(v(ver_i), changes);
819		}
820
821		// Should complete without panic
822		let result = engine.create_partition(input);
823		let normalized = normalize(result);
824
825		// Basic sanity checks
826		assert!(!normalized.is_empty());
827
828		// Verify all flows have ordered versions
829		for (_, units) in normalized {
830			for i in 1..units.len() {
831				assert!(units[i - 1].version < units[i].version, "Versions not sorted");
832			}
833		}
834	}
835
836	#[test]
837	fn test_version_ordering_maintained_under_stress() {
838		// S1 -> [F1]
839		let mut subscriptions = HashMap::new();
840		subscriptions.insert(s(1), vec![f(1)]);
841		let engine = setup_test_engine(subscriptions);
842
843		// 100 versions in completely scrambled order
844		let mut rng = rng();
845		let mut versions: Vec<u64> = (1..=100).collect();
846		for _ in 0..10 {
847			versions.shuffle(&mut rng);
848		}
849
850		let mut input = HashMap::new();
851		for ver in versions {
852			input.insert(v(ver), vec![(s(1), vec![mk_diff(&format!("d{}", ver))])]);
853		}
854
855		let result = engine.create_partition(input);
856		let normalized = normalize(result);
857
858		// F1 should have 100 units in perfect ascending order
859		let f1_units = &normalized[&f(1)];
860		assert_eq!(f1_units.len(), 100);
861
862		for i in 0..100 {
863			assert_eq!(f1_units[i].version, v((i + 1) as u64));
864		}
865	}
866}