reifydb_sub_flow/engine/
partition.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::collections::BTreeMap;
5
6use indexmap::IndexMap;
7use reifydb_core::{
8	CommitVersion,
9	interface::{FlowId, SourceId},
10};
11use reifydb_flow_operator_sdk::{FlowChange, FlowDiff};
12
13use crate::worker::{UnitOfWork, UnitsOfWork};
14
15impl crate::engine::FlowEngine {
16	/// Partition changes from multiple versions into units of work grouped by flow
17	///
18	/// This method handles the complete partitioning logic:
19	/// 1. Processes each version's changes separately
20	/// 2. Groups units by flow across all versions
21	/// 3. Maintains version ordering within each flow
22	///
23	/// # Arguments
24	/// * `changes_by_version` - Map of version to (source_id, changes) pairs
25	///
26	/// # Returns
27	/// UnitsOfWork where each flow has its units ordered by version
28	pub async fn create_partition(
29		&self,
30		changes_by_version: BTreeMap<CommitVersion, Vec<(SourceId, Vec<FlowDiff>)>>,
31	) -> UnitsOfWork {
32		let mut all_units_by_flow: BTreeMap<FlowId, Vec<UnitOfWork>> = BTreeMap::new();
33
34		// BTreeMap is already sorted by key, so we iterate in version order
35		for (version, changes) in &changes_by_version {
36			let version = *version;
37
38			// Group changes by source for this version
39			// Using IndexMap to preserve CDC insertion order within the version
40			let mut changes_by_source: IndexMap<SourceId, Vec<FlowDiff>> = IndexMap::new();
41			for (source_id, diffs) in changes {
42				changes_by_source.entry(*source_id).or_insert_with(Vec::new).extend(diffs.clone());
43			}
44
45			// Partition this version's changes into units of work
46			let version_units = self.partition_into_units_of_work(changes_by_source, version).await;
47
48			// Merge units from this version into the overall collection
49			// Each flow's units are stored in a Vec to maintain version ordering
50			for flow_units in version_units.into_inner() {
51				for unit in flow_units {
52					all_units_by_flow.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
53				}
54			}
55		}
56
57		// Convert the HashMap to UnitsOfWork for the worker
58		let units_vec: Vec<Vec<UnitOfWork>> = all_units_by_flow.into_iter().map(|(_, units)| units).collect();
59
60		// INVARIANT: Validate that each flow_id appears exactly once in the output
61		// and that each inner Vec contains units for only one flow
62		{
63			use std::collections::HashSet;
64			let mut seen_flows = HashSet::new();
65
66			for flow_units in &units_vec {
67				assert!(!flow_units.is_empty(), "INVARIANT VIOLATED: Empty flow units in UnitsOfWork");
68
69				let flow_id = flow_units[0].flow_id;
70				assert!(
71					!seen_flows.contains(&flow_id),
72					"INVARIANT VIOLATED: flow_id {:?} appears multiple times in UnitsOfWork. \
73					This means the same flow will be processed by multiple parallel tasks, \
74					causing keyspace overlap.",
75					flow_id
76				);
77
78				// Validate all units in this Vec are for the same flow
79				for unit in flow_units {
80					assert_eq!(
81						unit.flow_id, flow_id,
82						"INVARIANT VIOLATED: Mixed flow_ids in same Vec - expected {:?}, got {:?}. \
83						All units in a Vec must belong to the same flow.",
84						flow_id, unit.flow_id
85					);
86				}
87
88				seen_flows.insert(flow_id);
89			}
90		}
91
92		UnitsOfWork::new(units_vec)
93	}
94
95	async fn partition_into_units_of_work(
96		&self,
97		changes_by_source: IndexMap<SourceId, Vec<FlowDiff>>,
98		version: CommitVersion,
99	) -> UnitsOfWork {
100		// Map to collect all source changes per flow
101		let mut flow_changes: BTreeMap<FlowId, Vec<FlowChange>> = BTreeMap::new();
102
103		// Read source subscriptions and backfill versions
104		let sources = self.inner.sources.read().await;
105		let flow_creation_version = self.inner.flow_creation_versions.read().await;
106
107		// For each source that changed
108		for (source_id, diffs) in changes_by_source {
109			// Find all flows subscribed to this source
110			if let Some(subscriptions) = sources.get(&source_id) {
111				for (flow_id, node_id) in subscriptions {
112					// Skip CDC events that were already included in the backfill
113					if let Some(&flow_creation_version) = flow_creation_version.get(flow_id) {
114						if version < flow_creation_version {
115							continue;
116						}
117					}
118
119					// Create FlowChange scoped to the specific node in this flow
120					// This ensures each flow only processes its own nodes, preventing keyspace
121					// overlap
122					let change = FlowChange::internal(*node_id, version, diffs.clone());
123					flow_changes.entry(*flow_id).or_insert_with(Vec::new).push(change);
124				}
125			}
126		}
127
128		// Group all units at this version into a single UnitsOfWork
129		// Since all units are at the same version, each flow gets exactly one unit
130		let units: Vec<Vec<UnitOfWork>> = flow_changes
131			.into_iter()
132			.map(|(flow_id, source_changes)| vec![UnitOfWork::new(flow_id, version, source_changes)])
133			.collect();
134
135		UnitsOfWork::new(units)
136	}
137}
138
139#[cfg(test)]
140mod tests {
141	use std::{
142		collections::{BTreeMap, HashMap},
143		sync::Arc,
144	};
145
146	use rand::{rng, seq::SliceRandom};
147	use reifydb_core::{
148		CommitVersion, Row,
149		event::EventBus,
150		interface::{FlowId, FlowNodeId, SourceId, TableId},
151		util::CowVec,
152		value::encoded::{EncodedValues, EncodedValuesNamedLayout},
153	};
154	use reifydb_engine::{StandardRowEvaluator, execute::Executor};
155	use reifydb_flow_operator_sdk::{FlowChangeOrigin, FlowDiff};
156	use reifydb_rql::flow::FlowGraphAnalyzer;
157	use reifydb_type::{RowNumber, Type};
158	use tokio::sync::RwLock;
159
160	use crate::{
161		engine::{FlowEngine, FlowEngineInner},
162		operator::transform::registry::TransformOperatorRegistry,
163		worker::{UnitOfWork, UnitsOfWork},
164	};
165
166	/// Helper to build sources map with explicit node IDs
167	/// Maps source_id to list of (flow_id, node_id) pairs
168	/// The node_id encodes BOTH the source and flow: source_id*1000 + flow_id
169	/// This ensures unique node IDs across all source subscriptions
170	fn mk_sources(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> HashMap<SourceId, Vec<(FlowId, FlowNodeId)>> {
171		let mut sources_map = HashMap::new();
172		for (source_id, flows) in subscriptions {
173			let source_num = match source_id {
174				SourceId::Table(tid) => tid.0,
175				_ => panic!("Only Table sources supported in tests"),
176			};
177			let subscriptions_with_nodes: Vec<(FlowId, FlowNodeId)> = flows
178				.into_iter()
179				.map(|flow_id| {
180					// Node ID = source * 1000 + flow, ensuring global uniqueness
181					let node_id = FlowNodeId(source_num * 1000 + flow_id.0);
182					(flow_id, node_id)
183				})
184				.collect();
185			sources_map.insert(source_id, subscriptions_with_nodes);
186		}
187		sources_map
188	}
189
190	fn setup_test_engine(subscriptions: HashMap<SourceId, Vec<FlowId>>) -> FlowEngine {
191		let evaluator = StandardRowEvaluator::default();
192		let executor = Executor::testing();
193		let registry = TransformOperatorRegistry::new();
194
195		let sources = mk_sources(subscriptions);
196
197		let inner = FlowEngineInner {
198			evaluator,
199			executor,
200			registry,
201			operators: RwLock::new(HashMap::new()),
202			flows: RwLock::new(HashMap::new()),
203			sources: RwLock::new(sources),
204			sinks: RwLock::new(HashMap::new()),
205			analyzer: RwLock::new(FlowGraphAnalyzer::new()),
206			event_bus: EventBus::new(),
207			flow_creation_versions: RwLock::new(HashMap::new()),
208		};
209
210		FlowEngine {
211			inner: Arc::new(inner),
212		}
213	}
214
215	/// Create a test FlowDiff with identifiable data
216	fn mk_diff(label: &str) -> FlowDiff {
217		let row = mk_row(label);
218		FlowDiff::Insert {
219			post: row,
220		}
221	}
222
223	/// Create a test Row with identifiable data
224	fn mk_row(label: &str) -> Row {
225		Row {
226			number: RowNumber(label.len() as u64),
227			encoded: EncodedValues(CowVec::new(label.as_bytes().to_vec())),
228			layout: EncodedValuesNamedLayout::new(std::iter::once(("test".to_string(), Type::Uint8))),
229		}
230	}
231
232	/// Normalize UnitsOfWork into a sorted map for comparison
233	fn normalize(units: UnitsOfWork) -> BTreeMap<FlowId, Vec<UnitOfWork>> {
234		let mut map = BTreeMap::new();
235		for flow_units in units.into_inner() {
236			for unit in flow_units {
237				map.entry(unit.flow_id).or_insert_with(Vec::new).push(unit);
238			}
239		}
240		// Sort each flow's units by version
241		for vec in map.values_mut() {
242			vec.sort_by_key(|u| u.version);
243		}
244		map
245	}
246
247	/// Extract a snapshot of a unit: (version, source_id -> diff_count)
248	/// Note: After the fix, FlowChanges use Internal origin with node_id
249	/// In tests, node_id = source_id * 1000 + flow_id, so we reverse-engineer source_id
250	fn snapshot_unit(unit: &UnitOfWork) -> (CommitVersion, BTreeMap<SourceId, usize>) {
251		let mut sources = BTreeMap::new();
252
253		for change in &unit.source_changes {
254			match change.origin {
255				FlowChangeOrigin::External(source_id) => {
256					let count = change.diffs.len();
257					*sources.entry(source_id).or_insert(0) += count;
258				}
259				FlowChangeOrigin::Internal(node_id) => {
260					// In test setup, node_id = source_id * 1000 + flow_id
261					// So source_id = node_id / 1000
262					let source_num = node_id.0 / 1000;
263					let source_id = SourceId::Table(TableId(source_num));
264					let count = change.diffs.len();
265					*sources.entry(source_id).or_insert(0) += count;
266				}
267			}
268		}
269		(unit.version, sources)
270	}
271
272	/// Helper to create source IDs
273	fn s(id: u64) -> SourceId {
274		SourceId::Table(TableId(id))
275	}
276
277	/// Helper to create flow IDs
278	fn f(id: u64) -> FlowId {
279		FlowId(id)
280	}
281
282	/// Helper to create commit versions
283	fn v(ver: u64) -> CommitVersion {
284		CommitVersion(ver)
285	}
286
287	/// Compare two normalized results by their snapshots
288	fn assert_normalized_eq(
289		result: &BTreeMap<FlowId, Vec<UnitOfWork>>,
290		expected: &BTreeMap<FlowId, Vec<UnitOfWork>>,
291	) {
292		assert_eq!(result.len(), expected.len(), "Different number of flows");
293
294		for (flow_id, result_units) in result {
295			let expected_units =
296				expected.get(flow_id).expect(&format!("Flow {:?} missing in expected", flow_id));
297			assert_eq!(
298				result_units.len(),
299				expected_units.len(),
300				"Flow {:?} has different unit count",
301				flow_id
302			);
303
304			for (i, (result_unit, expected_unit)) in
305				result_units.iter().zip(expected_units.iter()).enumerate()
306			{
307				let result_snapshot = snapshot_unit(result_unit);
308				let expected_snapshot = snapshot_unit(expected_unit);
309				assert_eq!(
310					result_snapshot, expected_snapshot,
311					"Flow {:?} unit {} differs: {:?} vs {:?}",
312					flow_id, i, result_snapshot, expected_snapshot
313				);
314			}
315		}
316	}
317
318	#[tokio::test]
319	async fn test_empty_input() {
320		let engine = setup_test_engine(HashMap::new());
321		let input = BTreeMap::new();
322
323		let result = engine.create_partition(input).await;
324
325		assert!(result.is_empty(), "Empty input should produce empty output");
326	}
327
328	#[tokio::test]
329	async fn test_single_version_single_source_single_flow() {
330		// S1 -> F1
331		let mut subscriptions = HashMap::new();
332		subscriptions.insert(s(1), vec![f(1)]);
333		let engine = setup_test_engine(subscriptions);
334
335		// V10: S1[d1, d2]
336		let mut input = BTreeMap::new();
337		input.insert(v(10), vec![(s(1), vec![mk_diff("d1"), mk_diff("d2")])]);
338
339		let result = engine.create_partition(input).await;
340		let normalized = normalize(result);
341
342		// Expect F1 has 1 unit at V10 with S1:2 diffs
343		assert_eq!(normalized.len(), 1);
344		let f1_units = &normalized[&f(1)];
345		assert_eq!(f1_units.len(), 1);
346
347		let (ver, sources) = snapshot_unit(&f1_units[0]);
348		assert_eq!(ver, v(10));
349		assert_eq!(sources.get(&s(1)), Some(&2));
350	}
351
352	#[tokio::test]
353	async fn test_single_version_multi_flow_fanout() {
354		// S1 -> [F1, F2, F3]
355		let mut subscriptions = HashMap::new();
356		subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
357		let engine = setup_test_engine(subscriptions);
358
359		// V1: S1[d1]
360		let mut input = BTreeMap::new();
361		input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
362
363		let result = engine.create_partition(input).await;
364		let normalized = normalize(result);
365
366		// Expect F1, F2, F3 each have 1 unit at V1 with S1:1
367		assert_eq!(normalized.len(), 3);
368
369		for flow_id in [f(1), f(2), f(3)] {
370			let units = &normalized[&flow_id];
371			assert_eq!(units.len(), 1);
372			let (ver, sources) = snapshot_unit(&units[0]);
373			assert_eq!(ver, v(1));
374			assert_eq!(sources.get(&s(1)), Some(&1));
375		}
376	}
377
378	#[tokio::test]
379	async fn test_single_version_multi_source_partial_overlap() {
380		// S1 -> [F1, F2], S2 -> [F2, F3]
381		let mut subscriptions = HashMap::new();
382		subscriptions.insert(s(1), vec![f(1), f(2)]);
383		subscriptions.insert(s(2), vec![f(2), f(3)]);
384		let engine = setup_test_engine(subscriptions);
385
386		// V7: S1[a], S2[b,c]
387		let mut input = BTreeMap::new();
388		input.insert(v(7), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
389
390		let result = engine.create_partition(input).await;
391		let normalized = normalize(result);
392
393		assert_eq!(normalized.len(), 3);
394
395		// F1 @V7: S1:1
396		let f1_units = &normalized[&f(1)];
397		assert_eq!(f1_units.len(), 1);
398		let (ver, sources) = snapshot_unit(&f1_units[0]);
399		assert_eq!(ver, v(7));
400		assert_eq!(sources.len(), 1);
401		assert_eq!(sources.get(&s(1)), Some(&1));
402
403		// F2 @V7: S1:1, S2:2
404		let f2_units = &normalized[&f(2)];
405		assert_eq!(f2_units.len(), 1);
406		let (ver, sources) = snapshot_unit(&f2_units[0]);
407		assert_eq!(ver, v(7));
408		assert_eq!(sources.len(), 2);
409		assert_eq!(sources.get(&s(1)), Some(&1));
410		assert_eq!(sources.get(&s(2)), Some(&2));
411
412		// F3 @V7: S2:2
413		let f3_units = &normalized[&f(3)];
414		assert_eq!(f3_units.len(), 1);
415		let (ver, sources) = snapshot_unit(&f3_units[0]);
416		assert_eq!(ver, v(7));
417		assert_eq!(sources.len(), 1);
418		assert_eq!(sources.get(&s(2)), Some(&2));
419	}
420
421	#[tokio::test]
422	async fn test_unknown_source_filtered() {
423		// S1 -> [F1]
424		let mut subscriptions = HashMap::new();
425		subscriptions.insert(s(1), vec![f(1)]);
426		let engine = setup_test_engine(subscriptions);
427
428		// V3: S999[x] (unknown source)
429		let mut input = BTreeMap::new();
430		input.insert(v(3), vec![(s(999), vec![mk_diff("x")])]);
431
432		let result = engine.create_partition(input).await;
433
434		assert!(result.is_empty(), "Unknown sources should produce no units");
435	}
436
437	#[tokio::test]
438	async fn test_multi_version_ordering() {
439		// S1 -> [F1], S2 -> [F2]
440		let mut subscriptions = HashMap::new();
441		subscriptions.insert(s(1), vec![f(1)]);
442		subscriptions.insert(s(2), vec![f(2)]);
443		let engine = setup_test_engine(subscriptions);
444
445		// Input versions intentionally unsorted: V20, V10, V30
446		let mut input = BTreeMap::new();
447		input.insert(v(20), vec![(s(1), vec![mk_diff("a")])]);
448		input.insert(v(10), vec![(s(1), vec![mk_diff("b")])]);
449		input.insert(v(30), vec![(s(2), vec![mk_diff("c")])]);
450
451		let result = engine.create_partition(input).await;
452		let normalized = normalize(result);
453
454		// F1: units at V10 then V20 (ascending)
455		let f1_units = &normalized[&f(1)];
456		assert_eq!(f1_units.len(), 2);
457		assert_eq!(f1_units[0].version, v(10));
458		assert_eq!(f1_units[1].version, v(20));
459
460		// F2: single unit at V30
461		let f2_units = &normalized[&f(2)];
462		assert_eq!(f2_units.len(), 1);
463		assert_eq!(f2_units[0].version, v(30));
464	}
465
466	#[tokio::test]
467	async fn test_version_gaps_preserved() {
468		// S1 -> [F1]
469		let mut subscriptions = HashMap::new();
470		subscriptions.insert(s(1), vec![f(1)]);
471		let engine = setup_test_engine(subscriptions);
472
473		// V1, V100 (non-contiguous)
474		let mut input = BTreeMap::new();
475		input.insert(v(1), vec![(s(1), vec![mk_diff("a")])]);
476		input.insert(v(100), vec![(s(1), vec![mk_diff("b")])]);
477
478		let result = engine.create_partition(input).await;
479		let normalized = normalize(result);
480
481		// F1 has exactly 2 units at V1 and V100
482		let f1_units = &normalized[&f(1)];
483		assert_eq!(f1_units.len(), 2);
484		assert_eq!(f1_units[0].version, v(1));
485		assert_eq!(f1_units[1].version, v(100));
486	}
487
488	#[tokio::test]
489	async fn test_duplicate_source_entries_merged() {
490		// S1 -> [F1]
491		let mut subscriptions = HashMap::new();
492		subscriptions.insert(s(1), vec![f(1)]);
493		let engine = setup_test_engine(subscriptions);
494
495		// V5: S1[x,y], S1[z] (duplicate source entries)
496		let mut input = BTreeMap::new();
497		input.insert(v(5), vec![(s(1), vec![mk_diff("x"), mk_diff("y")]), (s(1), vec![mk_diff("z")])]);
498
499		let result = engine.create_partition(input).await;
500		let normalized = normalize(result);
501
502		// F1 @V5 should have S1:3 diffs (merged)
503		let f1_units = &normalized[&f(1)];
504		assert_eq!(f1_units.len(), 1);
505		let (ver, sources) = snapshot_unit(&f1_units[0]);
506		assert_eq!(ver, v(5));
507		assert_eq!(sources.get(&s(1)), Some(&3));
508	}
509
510	#[tokio::test]
511	async fn test_flow_with_multiple_sources_same_version() {
512		// S1 -> [F1], S2 -> [F1]
513		let mut subscriptions = HashMap::new();
514		subscriptions.insert(s(1), vec![f(1)]);
515		subscriptions.insert(s(2), vec![f(1)]);
516		let engine = setup_test_engine(subscriptions);
517
518		// V8: S1[a], S2[b,c]
519		let mut input = BTreeMap::new();
520		input.insert(v(8), vec![(s(1), vec![mk_diff("a")]), (s(2), vec![mk_diff("b"), mk_diff("c")])]);
521
522		let result = engine.create_partition(input).await;
523		let normalized = normalize(result);
524
525		// F1 @V8 has two sources: S1:1, S2:2
526		let f1_units = &normalized[&f(1)];
527		assert_eq!(f1_units.len(), 1);
528		let (ver, sources) = snapshot_unit(&f1_units[0]);
529		assert_eq!(ver, v(8));
530		assert_eq!(sources.len(), 2);
531		assert_eq!(sources.get(&s(1)), Some(&1));
532		assert_eq!(sources.get(&s(2)), Some(&2));
533	}
534
535	#[tokio::test]
536	async fn test_no_work_for_unaffected_flows() {
537		// S1 -> [F1], S2 -> [F2]
538		let mut subscriptions = HashMap::new();
539		subscriptions.insert(s(1), vec![f(1)]);
540		subscriptions.insert(s(2), vec![f(2)]);
541		let engine = setup_test_engine(subscriptions);
542
543		// V2: S1[a] only
544		let mut input = BTreeMap::new();
545		input.insert(v(2), vec![(s(1), vec![mk_diff("a")])]);
546
547		let result = engine.create_partition(input).await;
548		let normalized = normalize(result);
549
550		// Only F1 should have work
551		assert_eq!(normalized.len(), 1);
552		assert!(normalized.contains_key(&f(1)));
553		assert!(!normalized.contains_key(&f(2)));
554	}
555
556	#[tokio::test]
557	async fn test_complex_multi_flow_multi_version() {
558		// S1 -> [F1,F2], S2 -> [F2]
559		let mut subscriptions = HashMap::new();
560		subscriptions.insert(s(1), vec![f(1), f(2)]);
561		subscriptions.insert(s(2), vec![f(2)]);
562		let engine = setup_test_engine(subscriptions);
563
564		// V1: S1[a1], S2[b1]
565		// V2: S1[a2]
566		let mut input = BTreeMap::new();
567		input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
568		input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
569
570		let result = engine.create_partition(input).await;
571		let normalized = normalize(result);
572
573		// F1: V1 {S1:1}, V2 {S1:1}
574		let f1_units = &normalized[&f(1)];
575		assert_eq!(f1_units.len(), 2);
576
577		let (ver1, sources1) = snapshot_unit(&f1_units[0]);
578		assert_eq!(ver1, v(1));
579		assert_eq!(sources1.get(&s(1)), Some(&1));
580
581		let (ver2, sources2) = snapshot_unit(&f1_units[1]);
582		assert_eq!(ver2, v(2));
583		assert_eq!(sources2.get(&s(1)), Some(&1));
584
585		// F2: V1 {S1:1, S2:1}, V2 {S1:1}
586		let f2_units = &normalized[&f(2)];
587		assert_eq!(f2_units.len(), 2);
588
589		let (ver1, sources1) = snapshot_unit(&f2_units[0]);
590		assert_eq!(ver1, v(1));
591		assert_eq!(sources1.get(&s(1)), Some(&1));
592		assert_eq!(sources1.get(&s(2)), Some(&1));
593
594		let (ver2, sources2) = snapshot_unit(&f2_units[1]);
595		assert_eq!(ver2, v(2));
596		assert_eq!(sources2.get(&s(1)), Some(&1));
597	}
598
599	#[tokio::test]
600	async fn test_large_diffs_zero_subscribers() {
601		let engine = setup_test_engine(HashMap::new());
602
603		// Many diffs but no subscribers
604		let diffs: Vec<FlowDiff> = (0..1000).map(|i| mk_diff(&format!("d{}", i))).collect();
605		let mut input = BTreeMap::new();
606		input.insert(v(1), vec![(s(1), diffs)]);
607
608		let result = engine.create_partition(input).await;
609
610		assert!(result.is_empty(), "No subscribers means no units");
611	}
612
613	#[tokio::test]
614	async fn test_many_versions_sparse_changes() {
615		// S1 -> [F1]
616		let mut subscriptions = HashMap::new();
617		subscriptions.insert(s(1), vec![f(1)]);
618		let engine = setup_test_engine(subscriptions);
619
620		// 100 versions, but flow only affected by versions 10, 50, 90
621		let mut input = BTreeMap::new();
622		for i in 1..=100 {
623			if i == 10 || i == 50 || i == 90 {
624				input.insert(v(i), vec![(s(1), vec![mk_diff(&format!("d{}", i))])]);
625			} else {
626				// Other sources not subscribed by F1
627				input.insert(v(i), vec![(s(999), vec![mk_diff("x")])]);
628			}
629		}
630
631		let result = engine.create_partition(input).await;
632		let normalized = normalize(result);
633
634		// F1 should only have 3 units
635		let f1_units = &normalized[&f(1)];
636		assert_eq!(f1_units.len(), 3);
637		assert_eq!(f1_units[0].version, v(10));
638		assert_eq!(f1_units[1].version, v(50));
639		assert_eq!(f1_units[2].version, v(90));
640	}
641
642	#[tokio::test]
643	async fn test_many_sources_selective_subscription() {
644		// F1 subscribes to only S5, S15, S25, S35, S45 out of 50 sources
645		let mut subscriptions = HashMap::new();
646		for i in 1..=50 {
647			if i % 10 == 5 {
648				subscriptions.insert(s(i), vec![f(1)]);
649			}
650		}
651		let engine = setup_test_engine(subscriptions);
652
653		// All 50 sources change
654		let mut changes = vec![];
655		for i in 1..=50 {
656			changes.push((s(i), vec![mk_diff(&format!("d{}", i))]));
657		}
658		let mut input = BTreeMap::new();
659		input.insert(v(1), changes);
660
661		let result = engine.create_partition(input).await;
662		let normalized = normalize(result);
663
664		// F1 should have 1 unit with exactly 5 sources
665		let f1_units = &normalized[&f(1)];
666		assert_eq!(f1_units.len(), 1);
667		let (_, sources) = snapshot_unit(&f1_units[0]);
668		assert_eq!(sources.len(), 5);
669		assert!(sources.contains_key(&s(5)));
670		assert!(sources.contains_key(&s(15)));
671		assert!(sources.contains_key(&s(25)));
672		assert!(sources.contains_key(&s(35)));
673		assert!(sources.contains_key(&s(45)));
674	}
675
676	#[tokio::test]
677	async fn test_input_permutation_invariance() {
678		// S1 -> [F1, F2]
679		let mut subscriptions = HashMap::new();
680		subscriptions.insert(s(1), vec![f(1), f(2)]);
681		let engine = setup_test_engine(subscriptions);
682
683		// Create input with versions in different orders
684		let test_versions =
685			vec![vec![v(10), v(20), v(30)], vec![v(30), v(10), v(20)], vec![v(20), v(30), v(10)]];
686
687		let mut results = vec![];
688		for versions in test_versions {
689			let mut input = BTreeMap::new();
690			for ver in versions {
691				input.insert(ver, vec![(s(1), vec![mk_diff(&format!("d{}", ver.0))])]);
692			}
693			let result = engine.create_partition(input).await;
694			results.push(normalize(result));
695		}
696
697		// All permutations should produce the same normalized output
698		for i in 1..results.len() {
699			assert_normalized_eq(&results[0], &results[i]);
700		}
701
702		// Verify correct ordering
703		let f1_units = &results[0][&f(1)];
704		assert_eq!(f1_units.len(), 3);
705		assert_eq!(f1_units[0].version, v(10));
706		assert_eq!(f1_units[1].version, v(20));
707		assert_eq!(f1_units[2].version, v(30));
708	}
709
710	#[tokio::test]
711	async fn test_empty_diff_vec_handling() {
712		// S1 -> [F1]
713		let mut subscriptions = HashMap::new();
714		subscriptions.insert(s(1), vec![f(1)]);
715		let engine = setup_test_engine(subscriptions);
716
717		// V1: S1 with empty diff vec
718		let mut input = BTreeMap::new();
719		input.insert(v(1), vec![(s(1), vec![])]);
720
721		let result = engine.create_partition(input).await;
722		let normalized = normalize(result);
723
724		// Current behavior: empty diffs still create a unit with 0 count
725		// This documents the actual behavior
726		let f1_units = &normalized[&f(1)];
727		assert_eq!(f1_units.len(), 1);
728		let (ver, sources) = snapshot_unit(&f1_units[0]);
729		assert_eq!(ver, v(1));
730		assert_eq!(sources.get(&s(1)), Some(&0));
731	}
732
733	#[tokio::test]
734	async fn test_all_sources_unknown() {
735		// S1 -> [F1]
736		let mut subscriptions = HashMap::new();
737		subscriptions.insert(s(1), vec![f(1)]);
738		let engine = setup_test_engine(subscriptions);
739
740		// All input sources are unknown
741		let mut input = BTreeMap::new();
742		input.insert(v(1), vec![(s(999), vec![mk_diff("x")])]);
743		input.insert(v(2), vec![(s(888), vec![mk_diff("y")])]);
744
745		let result = engine.create_partition(input).await;
746
747		assert!(result.is_empty(), "All unknown sources should produce empty output");
748	}
749
750	#[tokio::test]
751	async fn test_permutation_regression_fanout() {
752		// S1 -> [F1, F2, F3]
753		let mut subscriptions = HashMap::new();
754		subscriptions.insert(s(1), vec![f(1), f(2), f(3)]);
755		let engine = setup_test_engine(subscriptions);
756
757		// Original input
758		let mut input = BTreeMap::new();
759		input.insert(v(1), vec![(s(1), vec![mk_diff("d1")])]);
760
761		let expected = normalize(engine.create_partition(input.clone()).await);
762
763		// Test 5 random permutations
764		for _ in 0..5 {
765			let mut entries: Vec<_> = input.clone().into_iter().collect();
766			entries.shuffle(&mut rand::rng());
767			let shuffled: BTreeMap<_, _> = entries.into_iter().collect();
768
769			let result = normalize(engine.create_partition(shuffled).await);
770			assert_normalized_eq(&result, &expected);
771		}
772	}
773
774	#[tokio::test]
775	async fn test_permutation_regression_complex() {
776		use rand::prelude::*;
777
778		// S1 -> [F1,F2], S2 -> [F2]
779		let mut subscriptions = HashMap::new();
780		subscriptions.insert(s(1), vec![f(1), f(2)]);
781		subscriptions.insert(s(2), vec![f(2)]);
782		let engine = setup_test_engine(subscriptions);
783
784		// Original input
785		let mut input = BTreeMap::new();
786		input.insert(v(1), vec![(s(1), vec![mk_diff("a1")]), (s(2), vec![mk_diff("b1")])]);
787		input.insert(v(2), vec![(s(1), vec![mk_diff("a2")])]);
788
789		let expected = normalize(engine.create_partition(input.clone()).await);
790
791		// Test 5 random permutations
792		for _ in 0..5 {
793			let mut entries: Vec<_> = input.clone().into_iter().collect();
794			entries.shuffle(&mut rand::rng());
795			let shuffled: BTreeMap<_, _> = entries.into_iter().collect();
796
797			let result = normalize(engine.create_partition(shuffled).await);
798			assert_normalized_eq(&result, &expected);
799		}
800	}
801
802	#[tokio::test]
803	async fn test_large_input_smoke() {
804		// 20 sources, 20 flows, sparse subscriptions
805		let mut subscriptions = HashMap::new();
806		for flow_i in 1..=20 {
807			// Each flow subscribes to 3 sources
808			for source_i in ((flow_i - 1) * 3 + 1)..=((flow_i - 1) * 3 + 3) {
809				let source_id = s(source_i % 20 + 1);
810				subscriptions.entry(source_id).or_insert_with(Vec::new).push(f(flow_i));
811			}
812		}
813		let engine = setup_test_engine(subscriptions);
814
815		// 1000 versions, each with 5 random sources changing
816		let mut input = BTreeMap::new();
817		for ver_i in 1..=1000 {
818			let mut changes = vec![];
819			for source_i in 1..=5 {
820				changes.push((s((ver_i % 20) + source_i), vec![mk_diff(&format!("d{}", ver_i))]));
821			}
822			input.insert(v(ver_i), changes);
823		}
824
825		// Should complete without panic
826		let result = engine.create_partition(input).await;
827		let normalized = normalize(result);
828
829		// Basic sanity checks
830		assert!(!normalized.is_empty());
831
832		// Verify all flows have ordered versions
833		for (_, units) in normalized {
834			for i in 1..units.len() {
835				assert!(units[i - 1].version < units[i].version, "Versions not sorted");
836			}
837		}
838	}
839
840	#[tokio::test]
841	async fn test_version_ordering_maintained_under_stress() {
842		let mut subscriptions = HashMap::new();
843		subscriptions.insert(s(1), vec![f(1)]);
844		let engine = setup_test_engine(subscriptions);
845
846		let mut rng = rng();
847		let mut versions: Vec<u64> = (1..=100).collect();
848		for _ in 0..10 {
849			versions.shuffle(&mut rng);
850		}
851
852		let mut input = BTreeMap::new();
853		for ver in versions {
854			input.insert(v(ver), vec![(s(1), vec![mk_diff(&format!("d{}", ver))])]);
855		}
856
857		let result = engine.create_partition(input).await;
858		let normalized = normalize(result);
859
860		let f1_units = &normalized[&f(1)];
861		assert_eq!(f1_units.len(), 100);
862
863		for i in 0..100 {
864			assert_eq!(f1_units[i].version, v((i + 1) as u64));
865		}
866	}
867
868	#[tokio::test]
869	async fn test_version_filtering() {
870		let mut subscriptions = HashMap::new();
871		subscriptions.insert(s(1), vec![f(1)]);
872		let engine = setup_test_engine(subscriptions);
873
874		engine.inner.flow_creation_versions.write().await.insert(f(1), v(50));
875
876		let mut input = BTreeMap::new();
877		input.insert(v(40), vec![(s(1), vec![mk_diff("d40")])]);
878		input.insert(v(50), vec![(s(1), vec![mk_diff("d50")])]);
879		input.insert(v(51), vec![(s(1), vec![mk_diff("d51")])]);
880		input.insert(v(60), vec![(s(1), vec![mk_diff("d60")])]);
881		input.insert(v(70), vec![(s(1), vec![mk_diff("d70")])]);
882
883		let result = engine.create_partition(input).await;
884		let normalized = normalize(result);
885
886		let f1_units = &normalized[&f(1)];
887		assert_eq!(f1_units.len(), 4);
888		assert_eq!(f1_units[0].version, v(50));
889		assert_eq!(f1_units[1].version, v(51));
890		assert_eq!(f1_units[2].version, v(60));
891		assert_eq!(f1_units[3].version, v(70));
892	}
893
894	#[tokio::test]
895	async fn test_backfill_version_per_flow_isolation() {
896		let mut subscriptions = HashMap::new();
897		subscriptions.insert(s(1), vec![f(1), f(2)]);
898		let engine = setup_test_engine(subscriptions);
899
900		engine.inner.flow_creation_versions.write().await.insert(f(1), v(30));
901		engine.inner.flow_creation_versions.write().await.insert(f(2), v(50));
902
903		let mut input = BTreeMap::new();
904		input.insert(v(20), vec![(s(1), vec![mk_diff("d20")])]);
905		input.insert(v(40), vec![(s(1), vec![mk_diff("d40")])]);
906		input.insert(v(60), vec![(s(1), vec![mk_diff("d60")])]);
907
908		let result = engine.create_partition(input).await;
909		let normalized = normalize(result);
910
911		let f1_units = &normalized[&f(1)];
912		assert_eq!(f1_units.len(), 2);
913		assert_eq!(f1_units[0].version, v(40));
914		assert_eq!(f1_units[1].version, v(60));
915
916		let f2_units = &normalized[&f(2)];
917		assert_eq!(f2_units.len(), 1);
918		assert_eq!(f2_units[0].version, v(60));
919	}
920
921	#[tokio::test]
922	async fn test_no_backfill_version_processes_all() {
923		let mut subscriptions = HashMap::new();
924		subscriptions.insert(s(1), vec![f(1)]);
925		let engine = setup_test_engine(subscriptions);
926
927		let mut input = BTreeMap::new();
928		input.insert(v(10), vec![(s(1), vec![mk_diff("d10")])]);
929		input.insert(v(20), vec![(s(1), vec![mk_diff("d20")])]);
930		input.insert(v(30), vec![(s(1), vec![mk_diff("d30")])]);
931
932		let result = engine.create_partition(input).await;
933		let normalized = normalize(result);
934
935		let f1_units = &normalized[&f(1)];
936		assert_eq!(f1_units.len(), 3);
937	}
938
939	#[tokio::test]
940	async fn test_backfill_version_exact_boundary() {
941		let mut subscriptions = HashMap::new();
942		subscriptions.insert(s(1), vec![f(1)]);
943		let engine = setup_test_engine(subscriptions);
944
945		engine.inner.flow_creation_versions.write().await.insert(f(1), v(100));
946
947		let mut input = BTreeMap::new();
948		input.insert(v(99), vec![(s(1), vec![mk_diff("d99")])]);
949		input.insert(v(100), vec![(s(1), vec![mk_diff("d100")])]);
950		input.insert(v(101), vec![(s(1), vec![mk_diff("d101")])]);
951
952		let result = engine.create_partition(input).await;
953		let normalized = normalize(result);
954
955		let f1_units = &normalized[&f(1)];
956		assert_eq!(f1_units.len(), 2);
957		assert_eq!(f1_units[0].version, v(100));
958		assert_eq!(f1_units[1].version, v(101));
959	}
960}