reifydb_sub_flow/engine/
partition.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::collections::BTreeMap;
5
6use indexmap::IndexMap;
7use reifydb_core::{
8	CommitVersion,
9	interface::{FlowId, SourceId},
10};
11use reifydb_flow_operator_sdk::{FlowChange, FlowDiff};
12use tracing::trace;
13
14use crate::worker::{UnitOfWork, UnitsOfWork};
15
16impl crate::engine::FlowEngine {
17	/// Partition changes from multiple versions into units of work grouped by flow
18	///
19	/// This method handles the complete partitioning logic:
20	/// 1. Processes each version's changes separately
21	/// 2. Groups units by flow across all versions
22	/// 3. Maintains version ordering within each flow
23	///
24	/// # Arguments
25	/// * `changes_by_version` - Map of version to (source_id, changes) pairs
26	///
27	/// # Returns
28	/// UnitsOfWork where each flow has its units ordered by version
29	pub fn create_partition(
30		&self,
31		changes_by_version: BTreeMap<CommitVersion, Vec<(SourceId, Vec<FlowDiff>)>>,
32	) -> UnitsOfWork {
33		let mut all_units_by_flow: BTreeMap<FlowId, Vec<UnitOfWork>> = BTreeMap::new();
34
35		// BTreeMap is already sorted by key, so we iterate in version order
36		for (version, changes) in &changes_by_version {
37			let version = *version;
38
39			// Group changes by source for this version
40			// Using IndexMap to preserve CDC insertion order within the version
41			let mut changes_by_source: IndexMap<SourceId, Vec<FlowDiff>> = IndexMap::new();
42			for (source_id, diffs) in changes {
43				changes_by_source.entry(*source_id).or_insert_with(Vec::new).extend(diffs.clone());
44			}
45
46			// Partition this version's changes into units of work
47			let version_units = self.partition_into_units_of_work(changes_by_source, version);
48
49			// Merge units from this version into the overall collection
50			// Each flow's units are stored in a Vec to maintain version ordering
51			for flow_units in version_units.into_inner() {
52				for unit in flow_units {
53					all_units_by_flow.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
54				}
55			}
56		}
57
58		// Convert the HashMap to UnitsOfWork for the worker
59		let units_vec: Vec<Vec<UnitOfWork>> = all_units_by_flow.into_iter().map(|(_, units)| units).collect();
60
61		// Log partition output
62		for (seq, flow_units) in units_vec.iter().enumerate() {
63			if !flow_units.is_empty() {
64				let flow_id = flow_units[0].flow_id;
65				let versions: Vec<_> = flow_units.iter().map(|u| u.version.0).collect();
66				trace!("[PARTITION] OUT seq={} flow={:?} versions={:?}", seq, flow_id, versions);
67			}
68		}
69
70		// INVARIANT: Validate that each flow_id appears exactly once in the output
71		// and that each inner Vec contains units for only one flow
72		{
73			use std::collections::HashSet;
74			let mut seen_flows = HashSet::new();
75
76			for flow_units in &units_vec {
77				assert!(!flow_units.is_empty(), "INVARIANT VIOLATED: Empty flow units in UnitsOfWork");
78
79				let flow_id = flow_units[0].flow_id;
80				assert!(
81					!seen_flows.contains(&flow_id),
82					"INVARIANT VIOLATED: flow_id {:?} appears multiple times in UnitsOfWork. \
83					This means the same flow will be processed by multiple parallel tasks, \
84					causing keyspace overlap.",
85					flow_id
86				);
87
88				// Validate all units in this Vec are for the same flow
89				for unit in flow_units {
90					assert_eq!(
91						unit.flow_id, flow_id,
92						"INVARIANT VIOLATED: Mixed flow_ids in same Vec - expected {:?}, got {:?}. \
93						All units in a Vec must belong to the same flow.",
94						flow_id, unit.flow_id
95					);
96				}
97
98				seen_flows.insert(flow_id);
99			}
100		}
101
102		UnitsOfWork::new(units_vec)
103	}
104
105	fn partition_into_units_of_work(
106		&self,
107		changes_by_source: IndexMap<SourceId, Vec<FlowDiff>>,
108		version: CommitVersion,
109	) -> UnitsOfWork {
110		// Map to collect all source changes per flow
111		let mut flow_changes: BTreeMap<FlowId, Vec<FlowChange>> = BTreeMap::new();
112
113		// Read source subscriptions and backfill versions
114		let sources = self.inner.sources.read();
115		let flow_creation_version = self.inner.flow_creation_versions.read();
116
117		// For each source that changed
118		for (source_id, diffs) in changes_by_source {
119			// Find all flows subscribed to this source
120			if let Some(subscriptions) = sources.get(&source_id) {
121				for (flow_id, node_id) in subscriptions {
122					// Skip CDC events that were already included in the backfill
123					if let Some(&flow_creation_version) = flow_creation_version.get(flow_id) {
124						if version < flow_creation_version {
125							continue;
126						}
127					}
128
129					// Create FlowChange scoped to the specific node in this flow
130					// This ensures each flow only processes its own nodes, preventing keyspace
131					// overlap
132					let change = FlowChange::internal(*node_id, version, diffs.clone());
133					flow_changes.entry(*flow_id).or_insert_with(Vec::new).push(change);
134				}
135			}
136		}
137
138		// Group all units at this version into a single UnitsOfWork
139		// Since all units are at the same version, each flow gets exactly one unit
140		let units: Vec<Vec<UnitOfWork>> = flow_changes
141			.into_iter()
142			.map(|(flow_id, source_changes)| vec![UnitOfWork::new(flow_id, version, source_changes)])
143			.collect();
144
145		UnitsOfWork::new(units)
146	}
147}
148
149#[cfg(test)]
150mod tests {
151	use std::{
152		collections::{BTreeMap, HashMap},
153		sync::Arc,
154	};
155
156	use parking_lot::RwLock;
157	use rand::{rng, seq::SliceRandom};
158	use reifydb_core::{
159		CommitVersion, Row,
160		event::EventBus,
161		interface::{FlowId, FlowNodeId, SourceId, TableId},
162		util::CowVec,
163		value::encoded::{EncodedValues, EncodedValuesNamedLayout},
164	};
165	use reifydb_engine::{StandardRowEvaluator, execute::Executor};
166	use reifydb_flow_operator_sdk::{FlowChangeOrigin, FlowDiff};
167	use reifydb_rql::flow::FlowGraphAnalyzer;
168	use reifydb_type::{RowNumber, Type};
169
170	use crate::{
171		engine::{FlowEngine, FlowEngineInner},
172		operator::transform::registry::TransformOperatorRegistry,
173		worker::{UnitOfWork, UnitsOfWork},
174	};
175
176	/// Helper to build sources map with explicit node IDs
177	/// Maps source_id to list of (flow_id, node_id) pairs
178	/// The node_id encodes BOTH the source and flow: source_id*1000 + flow_id
179	/// This ensures unique node IDs across all source subscriptions
180	fn mk_sources(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> HashMap<SourceId, Vec<(FlowId, FlowNodeId)>> {
181		let mut sources_map = HashMap::new();
182		for (source_id, flows) in subscriptions {
183			let source_num = match source_id {
184				SourceId::Table(tid) => tid.0,
185				_ => panic!("Only Table sources supported in tests"),
186			};
187			let subscriptions_with_nodes: Vec<(FlowId, FlowNodeId)> = flows
188				.into_iter()
189				.map(|flow_id| {
190					// Node ID = source * 1000 + flow, ensuring global uniqueness
191					let node_id = FlowNodeId(source_num * 1000 + flow_id.0);
192					(flow_id, node_id)
193				})
194				.collect();
195			sources_map.insert(source_id, subscriptions_with_nodes);
196		}
197		sources_map
198	}
199
200	fn setup_test_engine(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> FlowEngine {
201		let evaluator = StandardRowEvaluator::default();
202		let executor = Executor::testing();
203		let registry = TransformOperatorRegistry::new();
204
205		let sources = mk_sources(subscriptions);
206
207		let inner = FlowEngineInner {
208			evaluator,
209			executor,
210			registry,
211			operators: RwLock::new(HashMap::new()),
212			flows: RwLock::new(HashMap::new()),
213			sources: RwLock::new(sources),
214			sinks: RwLock::new(HashMap::new()),
215			analyzer: RwLock::new(FlowGraphAnalyzer::new()),
216			event_bus: EventBus::new(),
217			flow_creation_versions: RwLock::new(HashMap::new()),
218		};
219
220		FlowEngine {
221			inner: Arc::new(inner),
222		}
223	}
224
225	/// Create a test FlowDiff with identifiable data
226	fn mk_diff(label: &str) -> FlowDiff {
227		let row = mk_row(label);
228		FlowDiff::Insert {
229			post: row,
230		}
231	}
232
233	/// Create a test Row with identifiable data
234	fn mk_row(label: &str) -> Row {
235		Row {
236			number: RowNumber(label.len() as u64),
237			encoded: EncodedValues(CowVec::new(label.as_bytes().to_vec())),
238			layout: EncodedValuesNamedLayout::new(std::iter::once(("test".to_string(), Type::Uint8))),
239		}
240	}
241
242	/// Normalize UnitsOfWork into a sorted map for comparison
243	fn normalize(units: UnitsOfWork) -> BTreeMap<FlowId, Vec<UnitOfWork>> {
244		let mut map = BTreeMap::new();
245		for flow_units in units.into_inner() {
246			for unit in flow_units {
247				map.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
248			}
249		}
250		// Sort each flow's units by version
251		for vec in map.values_mut() {
252			vec.sort_by_key(|u| u.version);
253		}
254		map
255	}
256
257	/// Extract a snapshot of a unit: (version, source_id -> diff_count)
258	/// Note: After the fix, FlowChanges use Internal origin with node_id
259	/// In tests, node_id = source_id * 1000 + flow_id, so we reverse-engineer source_id
260	fn snapshot_unit(unit: &UnitOfWork) -> (CommitVersion, BTreeMap<SourceId, usize>) {
261		let mut sources = BTreeMap::new();
262
263		for change in &unit.source_changes {
264			match change.origin {
265				FlowChangeOrigin::External(source_id) => {
266					let count = change.diffs.len();
267					*sources.entry(source_id).or_insert(0) += count;
268				}
269				FlowChangeOrigin::Internal(node_id) => {
270					// In test setup, node_id = source_id * 1000 + flow_id
271					// So source_id = node_id / 1000
272					let source_num = node_id.0 / 1000;
273					let source_id = SourceId::Table(TableId(source_num));
274					let count = change.diffs.len();
275					*sources.entry(source_id).or_insert(0) += count;
276				}
277			}
278		}
279		(unit.version, sources)
280	}
281
282	/// Helper to create source IDs
283	fn s(id: u64) -> SourceId {
284		SourceId::Table(TableId(id))
285	}
286
287	/// Helper to create flow IDs
288	fn f(id: u64) -> FlowId {
289		FlowId(id)
290	}
291
292	/// Helper to create commit versions
293	fn v(ver: u64) -> CommitVersion {
294		CommitVersion(ver)
295	}
296
297	/// Compare two normalized results by their snapshots
298	fn assert_normalized_eq(
299		result: &BTreeMap<FlowId, Vec<UnitOfWork>>,
300		expected: &BTreeMap<FlowId, Vec<UnitOfWork>>,
301	) {
302		assert_eq!(result.len(), expected.len(), "Different number of flows");
303
304		for (flow_id, result_units) in result {
305			let expected_units =
306				expected.get(flow_id).expect(&format!("Flow {:?} missing in expected", flow_id));
307			assert_eq!(
308				result_units.len(),
309				expected_units.len(),
310				"Flow {:?} has different unit count",
311				flow_id
312			);
313
314			for (i, (result_unit, expected_unit)) in
315				result_units.iter().zip(expected_units.iter()).enumerate()
316			{
317				let result_snapshot = snapshot_unit(result_unit);
318				let expected_snapshot = snapshot_unit(expected_unit);
319				assert_eq!(
320					result_snapshot, expected_snapshot,
321					"Flow {:?} unit {} differs: {:?} vs {:?}",
322					flow_id, i, result_snapshot, expected_snapshot
323				);
324			}
325		}
326	}
327
328	#[test]
329	fn test_empty_input() {
330		let engine = setup_test_engine(HashMap::new());
331		let input = BTreeMap::new();
332
333		let result = engine.create_partition(input);
334
335		assert!(result.is_empty(), "Empty input should produce empty output");
336	}
337
338	#[test]
339	fn test_single_version_single_source_single_flow() {
340		// S1 -> F1
341		let mut subscriptions = HashMap::new();
342		subscriptions.insert(s(1), vec![f(1)]);
343		let engine = setup_test_engine(subscriptions);
344
345		// V10: S1[d1, d2]
346		let mut input = BTreeMap::new();
347		input.insert(v(10), vec![(s(1), vec![mk_diff("d1"), mk_diff("d2")])]);
348
349		let result = engine.create_partition(input);
350		let normalized = normalize(result);
351
352		// Expect F1 has 1 unit at V10 with S1:2 diffs
353		assert_eq!(normalized.len(), 1);
354		let f1_units = &normalized[&f(1)];
355		assert_eq!(f1_units.len(), 1);
356
357		let (ver, sources) = snapshot_unit(&f1_units[0]);
358		assert_eq!(ver, v(10));
359		assert_eq!(sources.get(&s(1)), Some(&2));
360	}
361
362	#[test]
363	fn test_single_version_multi_flow_fanout() {
364		// S1 -> [F1, F2, F3]
365		let mut subscriptions = HashMap::new();
366		subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
367		let engine = setup_test_engine(subscriptions);
368
369		// V1: S1[d1]
370		let mut input = BTreeMap::new();
371		input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
372
373		let result = engine.create_partition(input);
374		let normalized = normalize(result);
375
376		// Expect F1, F2, F3 each have 1 unit at V1 with S1:1
377		assert_eq!(normalized.len(), 3);
378
379		for flow_id in [f(1), f(2), f(3)] {
380			let units = &normalized[&flow_id];
381			assert_eq!(units.len(), 1);
382			let (ver, sources) = snapshot_unit(&units[0]);
383			assert_eq!(ver, v(1));
384			assert_eq!(sources.get(&s(1)), Some(&1));
385		}
386	}
387
388	#[test]
389	fn test_single_version_multi_source_partial_overlap() {
390		// S1 -> [F1, F2], S2 -> [F2, F3]
391		let mut subscriptions = HashMap::new();
392		subscriptions.insert(s(1), vec![f(1), f(2)]);
393		subscriptions.insert(s(2), vec![f(2), f(3)]);
394		let engine = setup_test_engine(subscriptions);
395
396		// V7: S1[a], S2[b,c]
397		let mut input = BTreeMap::new();
398		input.insert(v(7), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
399
400		let result = engine.create_partition(input);
401		let normalized = normalize(result);
402
403		assert_eq!(normalized.len(), 3);
404
405		// F1 @V7: S1:1
406		let f1_units = &normalized[&f(1)];
407		assert_eq!(f1_units.len(), 1);
408		let (ver, sources) = snapshot_unit(&f1_units[0]);
409		assert_eq!(ver, v(7));
410		assert_eq!(sources.len(), 1);
411		assert_eq!(sources.get(&s(1)), Some(&1));
412
413		// F2 @V7: S1:1, S2:2
414		let f2_units = &normalized[&f(2)];
415		assert_eq!(f2_units.len(), 1);
416		let (ver, sources) = snapshot_unit(&f2_units[0]);
417		assert_eq!(ver, v(7));
418		assert_eq!(sources.len(), 2);
419		assert_eq!(sources.get(&s(1)), Some(&1));
420		assert_eq!(sources.get(&s(2)), Some(&2));
421
422		// F3 @V7: S2:2
423		let f3_units = &normalized[&f(3)];
424		assert_eq!(f3_units.len(), 1);
425		let (ver, sources) = snapshot_unit(&f3_units[0]);
426		assert_eq!(ver, v(7));
427		assert_eq!(sources.len(), 1);
428		assert_eq!(sources.get(&s(2)), Some(&2));
429	}
430
431	#[test]
432	fn test_unknown_source_filtered() {
433		// S1 -> [F1]
434		let mut subscriptions = HashMap::new();
435		subscriptions.insert(s(1), vec![f(1)]);
436		let engine = setup_test_engine(subscriptions);
437
438		// V3: S999[x] (unknown source)
439		let mut input = BTreeMap::new();
440		input.insert(v(3), vec![(s(999), vec![mk_diff("x")])]);
441
442		let result = engine.create_partition(input);
443
444		assert!(result.is_empty(), "Unknown sources should produce no units");
445	}
446
447	#[test]
448	fn test_multi_version_ordering() {
449		// S1 -> [F1], S2 -> [F2]
450		let mut subscriptions = HashMap::new();
451		subscriptions.insert(s(1), vec![f(1)]);
452		subscriptions.insert(s(2), vec![f(2)]);
453		let engine = setup_test_engine(subscriptions);
454
455		// Input versions intentionally unsorted: V20, V10, V30
456		let mut input = BTreeMap::new();
457		input.insert(v(20), vec![(s(1), vec![mk_diff("a")])]);
458		input.insert(v(10), vec![(s(1), vec![mk_diff("b")])]);
459		input.insert(v(30), vec![(s(2), vec![mk_diff("c")])]);
460
461		let result = engine.create_partition(input);
462		let normalized = normalize(result);
463
464		// F1: units at V10 then V20 (ascending)
465		let f1_units = &normalized[&f(1)];
466		assert_eq!(f1_units.len(), 2);
467		assert_eq!(f1_units[0].version, v(10));
468		assert_eq!(f1_units[1].version, v(20));
469
470		// F2: single unit at V30
471		let f2_units = &normalized[&f(2)];
472		assert_eq!(f2_units.len(), 1);
473		assert_eq!(f2_units[0].version, v(30));
474	}
475
476	#[test]
477	fn test_version_gaps_preserved() {
478		// S1 -> [F1]
479		let mut subscriptions = HashMap::new();
480		subscriptions.insert(s(1), vec![f(1)]);
481		let engine = setup_test_engine(subscriptions);
482
483		// V1, V100 (non-contiguous)
484		let mut input = BTreeMap::new();
485		input.insert(v(1), vec![(s(1), vec![mk_diff("a")])]);
486		input.insert(v(100), vec![(s(1), vec![mk_diff("b")])]);
487
488		let result = engine.create_partition(input);
489		let normalized = normalize(result);
490
491		// F1 has exactly 2 units at V1 and V100
492		let f1_units = &normalized[&f(1)];
493		assert_eq!(f1_units.len(), 2);
494		assert_eq!(f1_units[0].version, v(1));
495		assert_eq!(f1_units[1].version, v(100));
496	}
497
498	#[test]
499	fn test_duplicate_source_entries_merged() {
500		// S1 -> [F1]
501		let mut subscriptions = HashMap::new();
502		subscriptions.insert(s(1), vec![f(1)]);
503		let engine = setup_test_engine(subscriptions);
504
505		// V5: S1[x,y], S1[z] (duplicate source entries)
506		let mut input = BTreeMap::new();
507		input.insert(v(5), vec![(s(1), vec![mk_diff("x"), mk_diff("y")]), (s(1), vec![mk_diff("z")])]);
508
509		let result = engine.create_partition(input);
510		let normalized = normalize(result);
511
512		// F1 @V5 should have S1:3 diffs (merged)
513		let f1_units = &normalized[&f(1)];
514		assert_eq!(f1_units.len(), 1);
515		let (ver, sources) = snapshot_unit(&f1_units[0]);
516		assert_eq!(ver, v(5));
517		assert_eq!(sources.get(&s(1)), Some(&3));
518	}
519
520	#[test]
521	fn test_flow_with_multiple_sources_same_version() {
522		// S1 -> [F1], S2 -> [F1]
523		let mut subscriptions = HashMap::new();
524		subscriptions.insert(s(1), vec![f(1)]);
525		subscriptions.insert(s(2), vec![f(1)]);
526		let engine = setup_test_engine(subscriptions);
527
528		// V8: S1[a], S2[b,c]
529		let mut input = BTreeMap::new();
530		input.insert(v(8), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
531
532		let result = engine.create_partition(input);
533		let normalized = normalize(result);
534
535		// F1 @V8 has two sources: S1:1, S2:2
536		let f1_units = &normalized[&f(1)];
537		assert_eq!(f1_units.len(), 1);
538		let (ver, sources) = snapshot_unit(&f1_units[0]);
539		assert_eq!(ver, v(8));
540		assert_eq!(sources.len(), 2);
541		assert_eq!(sources.get(&s(1)), Some(&1));
542		assert_eq!(sources.get(&s(2)), Some(&2));
543	}
544
545	#[test]
546	fn test_no_work_for_unaffected_flows() {
547		// S1 -> [F1], S2 -> [F2]
548		let mut subscriptions = HashMap::new();
549		subscriptions.insert(s(1), vec![f(1)]);
550		subscriptions.insert(s(2), vec![f(2)]);
551		let engine = setup_test_engine(subscriptions);
552
553		// V2: S1[a] only
554		let mut input = BTreeMap::new();
555		input.insert(v(2), vec![(s(1), vec![mk_diff("a")])]);
556
557		let result = engine.create_partition(input);
558		let normalized = normalize(result);
559
560		// Only F1 should have work
561		assert_eq!(normalized.len(), 1);
562		assert!(normalized.contains_key(&f(1)));
563		assert!(!normalized.contains_key(&f(2)));
564	}
565
566	#[test]
567	fn test_complex_multi_flow_multi_version() {
568		// S1 -> [F1,F2], S2 -> [F2]
569		let mut subscriptions = HashMap::new();
570		subscriptions.insert(s(1), vec![f(1), f(2)]);
571		subscriptions.insert(s(2), vec![f(2)]);
572		let engine = setup_test_engine(subscriptions);
573
574		// V1: S1[a1], S2[b1]
575		// V2: S1[a2]
576		let mut input = BTreeMap::new();
577		input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
578		input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
579
580		let result = engine.create_partition(input);
581		let normalized = normalize(result);
582
583		// F1: V1 {S1:1}, V2 {S1:1}
584		let f1_units = &normalized[&f(1)];
585		assert_eq!(f1_units.len(), 2);
586
587		let (ver1, sources1) = snapshot_unit(&f1_units[0]);
588		assert_eq!(ver1, v(1));
589		assert_eq!(sources1.get(&s(1)), Some(&1));
590
591		let (ver2, sources2) = snapshot_unit(&f1_units[1]);
592		assert_eq!(ver2, v(2));
593		assert_eq!(sources2.get(&s(1)), Some(&1));
594
595		// F2: V1 {S1:1, S2:1}, V2 {S1:1}
596		let f2_units = &normalized[&f(2)];
597		assert_eq!(f2_units.len(), 2);
598
599		let (ver1, sources1) = snapshot_unit(&f2_units[0]);
600		assert_eq!(ver1, v(1));
601		assert_eq!(sources1.get(&s(1)), Some(&1));
602		assert_eq!(sources1.get(&s(2)), Some(&1));
603
604		let (ver2, sources2) = snapshot_unit(&f2_units[1]);
605		assert_eq!(ver2, v(2));
606		assert_eq!(sources2.get(&s(1)), Some(&1));
607	}
608
609	#[test]
610	fn test_large_diffs_zero_subscribers() {
611		let engine = setup_test_engine(HashMap::new());
612
613		// Many diffs but no subscribers
614		let diffs: Vec<FlowDiff> = (0..1000).map(|i| mk_diff(&format!("d{}", i))).collect();
615		let mut input = BTreeMap::new();
616		input.insert(v(1), vec![(s(1), diffs)]);
617
618		let result = engine.create_partition(input);
619
620		assert!(result.is_empty(), "No subscribers means no units");
621	}
622
623	#[test]
624	fn test_many_versions_sparse_changes() {
625		// S1 -> [F1]
626		let mut subscriptions = HashMap::new();
627		subscriptions.insert(s(1), vec![f(1)]);
628		let engine = setup_test_engine(subscriptions);
629
630		// 100 versions, but flow only affected by versions 10, 50, 90
631		let mut input = BTreeMap::new();
632		for i in 1..=100 {
633			if i == 10 || i == 50 || i == 90 {
634				input.insert(v(i), vec![(s(1), vec![mk_diff(&format!("d{}", i))])]);
635			} else {
636				// Other sources not subscribed by F1
637				input.insert(v(i), vec![(s(999), vec![mk_diff("x")])]);
638			}
639		}
640
641		let result = engine.create_partition(input);
642		let normalized = normalize(result);
643
644		// F1 should only have 3 units
645		let f1_units = &normalized[&f(1)];
646		assert_eq!(f1_units.len(), 3);
647		assert_eq!(f1_units[0].version, v(10));
648		assert_eq!(f1_units[1].version, v(50));
649		assert_eq!(f1_units[2].version, v(90));
650	}
651
652	#[test]
653	fn test_many_sources_selective_subscription() {
654		// F1 subscribes to only S5, S15, S25, S35, S45 out of 50 sources
655		let mut subscriptions = HashMap::new();
656		for i in 1..=50 {
657			if i % 10 == 5 {
658				subscriptions.insert(s(i), vec![f(1)]);
659			}
660		}
661		let engine = setup_test_engine(subscriptions);
662
663		// All 50 sources change
664		let mut changes = vec![];
665		for i in 1..=50 {
666			changes.push((s(i), vec![mk_diff(&format!("d{}", i))]));
667		}
668		let mut input = BTreeMap::new();
669		input.insert(v(1), changes);
670
671		let result = engine.create_partition(input);
672		let normalized = normalize(result);
673
674		// F1 should have 1 unit with exactly 5 sources
675		let f1_units = &normalized[&f(1)];
676		assert_eq!(f1_units.len(), 1);
677		let (_, sources) = snapshot_unit(&f1_units[0]);
678		assert_eq!(sources.len(), 5);
679		assert!(sources.contains_key(&s(5)));
680		assert!(sources.contains_key(&s(15)));
681		assert!(sources.contains_key(&s(25)));
682		assert!(sources.contains_key(&s(35)));
683		assert!(sources.contains_key(&s(45)));
684	}
685
686	#[test]
687	fn test_input_permutation_invariance() {
688		// S1 -> [F1, F2]
689		let mut subscriptions = HashMap::new();
690		subscriptions.insert(s(1), vec![f(1), f(2)]);
691		let engine = setup_test_engine(subscriptions);
692
693		// Create input with versions in different orders
694		let test_versions =
695			vec![vec![v(10), v(20), v(30)], vec![v(30), v(10), v(20)], vec![v(20), v(30), v(10)]];
696
697		let mut results = vec![];
698		for versions in test_versions {
699			let mut input = BTreeMap::new();
700			for ver in versions {
701				input.insert(ver, vec![(s(1), vec![mk_diff(&format!("d{}", ver.0))])]);
702			}
703			let result = engine.create_partition(input);
704			results.push(normalize(result));
705		}
706
707		// All permutations should produce the same normalized output
708		for i in 1..results.len() {
709			assert_normalized_eq(&results[0], &results[i]);
710		}
711
712		// Verify correct ordering
713		let f1_units = &results[0][&f(1)];
714		assert_eq!(f1_units.len(), 3);
715		assert_eq!(f1_units[0].version, v(10));
716		assert_eq!(f1_units[1].version, v(20));
717		assert_eq!(f1_units[2].version, v(30));
718	}
719
720	#[test]
721	fn test_empty_diff_vec_handling() {
722		// S1 -> [F1]
723		let mut subscriptions = HashMap::new();
724		subscriptions.insert(s(1), vec![f(1)]);
725		let engine = setup_test_engine(subscriptions);
726
727		// V1: S1 with empty diff vec
728		let mut input = BTreeMap::new();
729		input.insert(v(1), vec![(s(1), vec![])]);
730
731		let result = engine.create_partition(input);
732		let normalized = normalize(result);
733
734		// Current behavior: empty diffs still create a unit with 0 count
735		// This documents the actual behavior
736		let f1_units = &normalized[&f(1)];
737		assert_eq!(f1_units.len(), 1);
738		let (ver, sources) = snapshot_unit(&f1_units[0]);
739		assert_eq!(ver, v(1));
740		assert_eq!(sources.get(&s(1)), Some(&0));
741	}
742
743	#[test]
744	fn test_all_sources_unknown() {
745		// S1 -> [F1]
746		let mut subscriptions = HashMap::new();
747		subscriptions.insert(s(1), vec![f(1)]);
748		let engine = setup_test_engine(subscriptions);
749
750		// All input sources are unknown
751		let mut input = BTreeMap::new();
752		input.insert(v(1), vec![(s(999), vec![mk_diff("x")])]);
753		input.insert(v(2), vec![(s(888), vec![mk_diff("y")])]);
754
755		let result = engine.create_partition(input);
756
757		assert!(result.is_empty(), "All unknown sources should produce empty output");
758	}
759
760	#[test]
761	fn test_permutation_regression_fanout() {
762		// S1 -> [F1, F2, F3]
763		let mut subscriptions = HashMap::new();
764		subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
765		let engine = setup_test_engine(subscriptions);
766
767		// Original input
768		let mut input = BTreeMap::new();
769		input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
770
771		let expected = normalize(engine.create_partition(input.clone()));
772
773		// Test 5 random permutations
774		for _ in 0..5 {
775			let mut entries: Vec<_> = input.clone().into_iter().collect();
776			entries.shuffle(&mut rand::rng());
777			let shuffled: BTreeMap<_, _> = entries.into_iter().collect();
778
779			let result = normalize(engine.create_partition(shuffled));
780			assert_normalized_eq(&result, &expected);
781		}
782	}
783
784	#[test]
785	fn test_permutation_regression_complex() {
786		use rand::prelude::*;
787
788		// S1 -> [F1,F2], S2 -> [F2]
789		let mut subscriptions = HashMap::new();
790		subscriptions.insert(s(1), vec![f(1), f(2)]);
791		subscriptions.insert(s(2), vec![f(2)]);
792		let engine = setup_test_engine(subscriptions);
793
794		// Original input
795		let mut input = BTreeMap::new();
796		input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
797		input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
798
799		let expected = normalize(engine.create_partition(input.clone()));
800
801		// Test 5 random permutations
802		for _ in 0..5 {
803			let mut entries: Vec<_> = input.clone().into_iter().collect();
804			entries.shuffle(&mut rand::rng());
805			let shuffled: BTreeMap<_, _> = entries.into_iter().collect();
806
807			let result = normalize(engine.create_partition(shuffled));
808			assert_normalized_eq(&result, &expected);
809		}
810	}
811
812	#[test]
813	fn test_large_input_smoke() {
814		// 20 sources, 20 flows, sparse subscriptions
815		let mut subscriptions = HashMap::new();
816		for flow_i in 1..=20 {
817			// Each flow subscribes to 3 sources
818			for source_i in ((flow_i - 1) * 3 + 1)..=((flow_i - 1) * 3 + 3) {
819				let source_id = s(source_i % 20 + 1);
820				subscriptions.entry(source_id).or_insert_with(Vec::new).push(f(flow_i));
821			}
822		}
823		let engine = setup_test_engine(subscriptions);
824
825		// 1000 versions, each with 5 random sources changing
826		let mut input = BTreeMap::new();
827		for ver_i in 1..=1000 {
828			let mut changes = vec![];
829			for source_i in 1..=5 {
830				changes.push((s((ver_i % 20) + source_i), vec![mk_diff(&format!("d{}", ver_i))]));
831			}
832			input.insert(v(ver_i), changes);
833		}
834
835		// Should complete without panic
836		let result = engine.create_partition(input);
837		let normalized = normalize(result);
838
839		// Basic sanity checks
840		assert!(!normalized.is_empty());
841
842		// Verify all flows have ordered versions
843		for (_, units) in normalized {
844			for i in 1..units.len() {
845				assert!(units[i - 1].version < units[i].version, "Versions not sorted");
846			}
847		}
848	}
849
850	#[test]
851	fn test_version_ordering_maintained_under_stress() {
852		let mut subscriptions = HashMap::new();
853		subscriptions.insert(s(1), vec![f(1)]);
854		let engine = setup_test_engine(subscriptions);
855
856		let mut rng = rng();
857		let mut versions: Vec<u64> = (1..=100).collect();
858		for _ in 0..10 {
859			versions.shuffle(&mut rng);
860		}
861
862		let mut input = BTreeMap::new();
863		for ver in versions {
864			input.insert(v(ver), vec![(s(1), vec![mk_diff(&format!("d{}", ver))])]);
865		}
866
867		let result = engine.create_partition(input);
868		let normalized = normalize(result);
869
870		let f1_units = &normalized[&f(1)];
871		assert_eq!(f1_units.len(), 100);
872
873		for i in 0..100 {
874			assert_eq!(f1_units[i].version, v((i + 1) as u64));
875		}
876	}
877
878	#[test]
879	fn test_version_filtering() {
880		let mut subscriptions = HashMap::new();
881		subscriptions.insert(s(1), vec![f(1)]);
882		let engine = setup_test_engine(subscriptions);
883
884		engine.inner.flow_creation_versions.write().insert(f(1), v(50));
885
886		let mut input = BTreeMap::new();
887		input.insert(v(40), vec![(s(1), vec![mk_diff("d40")])]);
888		input.insert(v(50), vec![(s(1), vec![mk_diff("d50")])]);
889		input.insert(v(51), vec![(s(1), vec![mk_diff("d51")])]);
890		input.insert(v(60), vec![(s(1), vec![mk_diff("d60")])]);
891		input.insert(v(70), vec![(s(1), vec![mk_diff("d70")])]);
892
893		let result = engine.create_partition(input);
894		let normalized = normalize(result);
895
896		let f1_units = &normalized[&f(1)];
897		assert_eq!(f1_units.len(), 4);
898		assert_eq!(f1_units[0].version, v(50));
899		assert_eq!(f1_units[1].version, v(51));
900		assert_eq!(f1_units[2].version, v(60));
901		assert_eq!(f1_units[3].version, v(70));
902	}
903
904	#[test]
905	fn test_backfill_version_per_flow_isolation() {
906		let mut subscriptions = HashMap::new();
907		subscriptions.insert(s(1), vec![f(1), f(2)]);
908		let engine = setup_test_engine(subscriptions);
909
910		engine.inner.flow_creation_versions.write().insert(f(1), v(30));
911		engine.inner.flow_creation_versions.write().insert(f(2), v(50));
912
913		let mut input = BTreeMap::new();
914		input.insert(v(20), vec![(s(1), vec![mk_diff("d20")])]);
915		input.insert(v(40), vec![(s(1), vec![mk_diff("d40")])]);
916		input.insert(v(60), vec![(s(1), vec![mk_diff("d60")])]);
917
918		let result = engine.create_partition(input);
919		let normalized = normalize(result);
920
921		let f1_units = &normalized[&f(1)];
922		assert_eq!(f1_units.len(), 2);
923		assert_eq!(f1_units[0].version, v(40));
924		assert_eq!(f1_units[1].version, v(60));
925
926		let f2_units = &normalized[&f(2)];
927		assert_eq!(f2_units.len(), 1);
928		assert_eq!(f2_units[0].version, v(60));
929	}
930
931	#[test]
932	fn test_no_backfill_version_processes_all() {
933		let mut subscriptions = HashMap::new();
934		subscriptions.insert(s(1), vec![f(1)]);
935		let engine = setup_test_engine(subscriptions);
936
937		let mut input = BTreeMap::new();
938		input.insert(v(10), vec![(s(1), vec![mk_diff("d10")])]);
939		input.insert(v(20), vec![(s(1), vec![mk_diff("d20")])]);
940		input.insert(v(30), vec![(s(1), vec![mk_diff("d30")])]);
941
942		let result = engine.create_partition(input);
943		let normalized = normalize(result);
944
945		let f1_units = &normalized[&f(1)];
946		assert_eq!(f1_units.len(), 3);
947	}
948
949	#[test]
950	fn test_backfill_version_exact_boundary() {
951		let mut subscriptions = HashMap::new();
952		subscriptions.insert(s(1), vec![f(1)]);
953		let engine = setup_test_engine(subscriptions);
954
955		engine.inner.flow_creation_versions.write().insert(f(1), v(100));
956
957		let mut input = BTreeMap::new();
958		input.insert(v(99), vec![(s(1), vec![mk_diff("d99")])]);
959		input.insert(v(100), vec![(s(1), vec![mk_diff("d100")])]);
960		input.insert(v(101), vec![(s(1), vec![mk_diff("d101")])]);
961
962		let result = engine.create_partition(input);
963		let normalized = normalize(result);
964
965		let f1_units = &normalized[&f(1)];
966		assert_eq!(f1_units.len(), 2);
967		assert_eq!(f1_units[0].version, v(100));
968		assert_eq!(f1_units[1].version, v(101));
969	}
970}