Skip to main content

openjd_model/job/
step_param_space.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// Copyright by contributors to this project.
3// SPDX-License-Identifier: (Apache-2.0 OR MIT)
4
5//! Step parameter space iteration.
6//!
7//! Provides `StepParameterSpaceIterator` for lazily iterating over the
8//! multidimensional space of task parameter values. Operates on resolved
9//! `job::StepParameterSpace` types (no SymbolTable needed).
10//!
11//! Uses a tree of `Node` objects for lazy evaluation:
12//! - `RangeExprNode`: computes values on demand via index arithmetic
13//! - `ProductNode`: divmod indexing (rightmost moves fastest)
14//! - `AssociationNode`: lockstep indexing
15//! - `StaticChunkNode`: pre-computed chunk boundaries
16
17use std::collections::HashSet;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::Arc;
20
21use openjd_expr::value::Float64;
22use openjd_expr::{ExprValue, RangeExpr};
23
24use crate::error::ModelError;
25use crate::job;
26use crate::template::RangeConstraint;
27use crate::types::{TaskParameterSet, TaskParameterType, TaskParameterValue};
28
29// ── Shared utilities ──
30
31/// Compute the product of child node lengths with overflow checking.
32fn checked_product_len(children: &[Box<dyn Node>]) -> Result<usize, ModelError> {
33    children.iter().try_fold(1usize, |acc, c| {
34        acc.checked_mul(c.len()).ok_or_else(|| {
35            ModelError::DecodeValidation(
36                "Total parameter space size overflow: the product of parameter dimensions is too large.".into(),
37            )
38        })
39    })
40}
41
42/// Tokenize a combination expression into identifiers and operators.
43fn tokenize(expr: &str) -> Vec<String> {
44    let mut tokens = Vec::new();
45    let mut current = String::new();
46    for ch in expr.chars() {
47        match ch {
48            '*' | '(' | ')' | ',' => {
49                if !current.is_empty() {
50                    tokens.push(std::mem::take(&mut current));
51                }
52                tokens.push(ch.to_string());
53            }
54            c if c.is_whitespace() => {
55                if !current.is_empty() {
56                    tokens.push(std::mem::take(&mut current));
57                }
58            }
59            _ => current.push(ch),
60        }
61    }
62    if !current.is_empty() {
63        tokens.push(current);
64    }
65    tokens
66}
67
68/// Compress a slice of integers into a compact range expression string.
69/// e.g., [1,2,3,5,7,8,9] → "1-3,5,7-9"
70fn compress_range_expr(values: &[i64]) -> String {
71    if values.is_empty() {
72        return String::new();
73    }
74    if values.len() == 1 {
75        return values[0].to_string();
76    }
77
78    // Detect runs with a constant step. A run needs 3+ values to use step notation;
79    // with only 2 values, any step is trivially valid so we don't commit to it.
80    let mut parts = Vec::new();
81    let mut i = 0;
82    while i < values.len() {
83        if i + 2 < values.len() {
84            let step = values[i + 1] - values[i];
85            if step > 0 && values[i + 2] - values[i + 1] == step {
86                // Found a run of at least 3 with constant step
87                let mut end = i + 2;
88                while end + 1 < values.len() && values[end + 1] - values[end] == step {
89                    end += 1;
90                }
91                if step == 1 {
92                    parts.push(format!("{}-{}", values[i], values[end]));
93                } else {
94                    parts.push(format!("{}-{}:{}", values[i], values[end], step));
95                }
96                i = end + 1;
97                continue;
98            }
99        }
100        parts.push(values[i].to_string());
101        i += 1;
102    }
103    parts.join(",")
104}
105
106/// Build a `RangeExpr` for chunk `i` given the chunk layout parameters.
107/// Used by `StaticChunkNode` and `StaticChunkIterator` for noncontiguous chunking.
108///
109/// - `range`: the full integer range being chunked
110/// - `constraint`: whether chunks must be contiguous (`1-10`) or can be non-contiguous (`1,3,7-10`)
111/// - `small`: base chunk size (`total_values / num_chunks`)
112/// - `leftovers`: how many of the first chunks get one extra element (`total_values % num_chunks`)
113/// - `i`: zero-based chunk index to build
114fn build_chunk_range_expr(
115    range: &job::TaskParamRange<i64>,
116    constraint: &RangeConstraint,
117    small: usize,
118    leftovers: usize,
119    i: usize,
120) -> RangeExpr {
121    let size = small + if i < leftovers { 1 } else { 0 };
122    let offset = i * small + i.min(leftovers);
123    let build = |vals: &[i64]| -> RangeExpr {
124        let range_str = match constraint {
125            RangeConstraint::Contiguous => {
126                if vals.len() == 1 {
127                    vals[0].to_string()
128                } else {
129                    format!("{}-{}", vals[0], vals[vals.len() - 1])
130                }
131            }
132            RangeConstraint::Noncontiguous => compress_range_expr(vals),
133        };
134        let expr = range_str
135            .parse::<RangeExpr>()
136            .expect("range string built from valid integers");
137        match constraint {
138            RangeConstraint::Contiguous => expr.with_contiguous(true),
139            RangeConstraint::Noncontiguous => expr,
140        }
141    };
142    match range {
143        job::TaskParamRange::RangeExpr(r) => {
144            let vals: Vec<i64> = (offset..offset + size)
145                .map(|j| r.get(j as i64).expect("chunk element within range bounds"))
146                .collect();
147            build(&vals)
148        }
149        job::TaskParamRange::List(values) => build(&values[offset..offset + size]),
150    }
151}
152
153// ── Node trait and implementations ──
154
155/// Internal trait for lazy parameter space tree nodes.
156trait Node: Send + Sync {
157    fn len(&self) -> usize;
158    fn get(&self, index: usize, result: &mut TaskParameterSet);
159    /// Validate containment with a detailed error message on failure.
160    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String>;
161    /// Create an iterator over this node's elements.
162    fn iter(&self) -> Box<dyn NodeIterator>;
163}
164
165/// Iterator trait for node-level iteration (supports adaptive chunking).
166trait NodeIterator {
167    fn next(&mut self, result: &mut TaskParameterSet) -> bool;
168    fn reset(&mut self);
169}
170
171/// Simple index-based iterator for non-adaptive nodes.
172/// Tracks only index and length — the caller (ProductIterator/AssociationIterator)
173/// is responsible for calling `get()` on the original node to populate results.
174struct IndexedNodeIterator {
175    len: usize,
176    index: usize,
177}
178
179impl NodeIterator for IndexedNodeIterator {
180    fn next(&mut self, _result: &mut TaskParameterSet) -> bool {
181        if self.index >= self.len {
182            return false;
183        }
184        self.index += 1;
185        true
186    }
187    fn reset(&mut self) {
188        self.index = 0;
189    }
190}
191
192/// Value-producing iterator for a single parameter with a list of values.
193struct RangeListIterator {
194    name: String,
195    param_type: TaskParameterType,
196    values: Vec<ExprValue>,
197    index: usize,
198}
199
200impl NodeIterator for RangeListIterator {
201    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
202        if self.index >= self.values.len() {
203            return false;
204        }
205        result.insert(
206            self.name.clone(),
207            TaskParameterValue {
208                param_type: self.param_type,
209                value: self.values[self.index].clone(),
210            },
211        );
212        self.index += 1;
213        true
214    }
215    fn reset(&mut self) {
216        self.index = 0;
217    }
218}
219
220/// Value-producing iterator for a single parameter with a RangeExpr.
221struct RangeExprIterator {
222    name: String,
223    range: RangeExpr,
224    index: usize,
225}
226
227impl NodeIterator for RangeExprIterator {
228    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
229        if self.index >= self.range.len() {
230            return false;
231        }
232        result.insert(
233            self.name.clone(),
234            TaskParameterValue {
235                param_type: TaskParameterType::Int,
236                value: ExprValue::Int(
237                    self.range
238                        .get(self.index as i64)
239                        .expect("index checked against range.len()"),
240                ),
241            },
242        );
243        self.index += 1;
244        true
245    }
246    fn reset(&mut self) {
247        self.index = 0;
248    }
249}
250
251/// Value-producing iterator for static chunk nodes.
252struct StaticChunkIterator {
253    name: String,
254    range: job::TaskParamRange<i64>,
255    constraint: RangeConstraint,
256    num_chunks: usize,
257    small: usize,
258    leftovers: usize,
259    index: usize,
260}
261
262impl StaticChunkIterator {
263    fn chunk_range_expr(&self, i: usize) -> RangeExpr {
264        build_chunk_range_expr(&self.range, &self.constraint, self.small, self.leftovers, i)
265    }
266}
267
268impl NodeIterator for StaticChunkIterator {
269    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
270        if self.index >= self.num_chunks {
271            return false;
272        }
273        result.insert(
274            self.name.clone(),
275            TaskParameterValue {
276                param_type: TaskParameterType::ChunkInt,
277                value: ExprValue::RangeExpr(self.chunk_range_expr(self.index)),
278            },
279        );
280        self.index += 1;
281        true
282    }
283    fn reset(&mut self) {
284        self.index = 0;
285    }
286}
287
288/// Contiguous chunking node: splits values into chunks that respect gaps.
289/// Contiguous runs in the source range are identified, then each run is
290/// chunked independently. Uses index-based access to avoid materializing values.
291struct ContiguousChunkNode {
292    name: String,
293    range: job::TaskParamRange<i64>,
294    default_task_count: usize,
295    num_chunks: usize, // cached exact count
296    total_len: usize,
297}
298
299/// Count contiguous chunks by walking the range's sub-ranges.
300/// For `RangeExpr`, uses the internal `IntRange` structure for O(R) where R is the
301/// number of sub-ranges (not the number of values). For `List`, scans values in O(N).
302fn count_contiguous_chunks_for_range(
303    range: &job::TaskParamRange<i64>,
304    default_task_count: usize,
305) -> usize {
306    match range {
307        job::TaskParamRange::List(v) => {
308            if v.is_empty() {
309                return 0;
310            }
311            let mut total = 0usize;
312            let mut interval_start = 0usize;
313            for i in 0..v.len() - 1 {
314                if v[i + 1] != v[i] + 1 {
315                    let len = i - interval_start + 1;
316                    total += len.div_ceil(default_task_count);
317                    interval_start = i + 1;
318                }
319            }
320            total += (v.len() - interval_start).div_ceil(default_task_count);
321            total
322        }
323        job::TaskParamRange::RangeExpr(r) => {
324            count_contiguous_chunks_from_sub_ranges(r, default_task_count)
325        }
326    }
327}
328
329/// Count contiguous chunks by iterating the `IntRange` sub-ranges of a `RangeExpr`.
330/// Merges adjacent intervals and computes chunk counts arithmetically per interval.
331fn count_contiguous_chunks_from_sub_ranges(r: &RangeExpr, default_task_count: usize) -> usize {
332    let sub_ranges = r.ranges();
333    if sub_ranges.is_empty() {
334        return 0;
335    }
336
337    let mut total_chunks = 0usize;
338    // Track the current merged contiguous interval as (start_val, end_val)
339    let mut interval: Option<(i64, i64)> = None;
340
341    for sr in sub_ranges {
342        if sr.step == 1 {
343            // This sub-range is contiguous: values from sr.start to sr.end
344            match interval {
345                Some((is, ie)) if sr.start == ie + 1 => {
346                    // Extends the current interval
347                    interval = Some((is, sr.end));
348                }
349                Some((is, ie)) => {
350                    // Gap — flush the current interval
351                    let len = (ie - is + 1) as usize;
352                    total_chunks += len.div_ceil(default_task_count);
353                    interval = Some((sr.start, sr.end));
354                }
355                None => {
356                    interval = Some((sr.start, sr.end));
357                }
358            }
359        } else {
360            // Step > 1: each value is isolated (has gaps between them).
361            // We need to check if the first value merges with the current interval,
362            // then each subsequent value is its own interval.
363            let count = sr.len();
364            for idx in 0..count {
365                // SAFETY: idx is bounded by sr.len(), so get() always returns Some.
366                let val = sr.get(idx).expect("index within sub-range bounds");
367                match interval {
368                    Some((is, ie)) if val == ie + 1 => {
369                        interval = Some((is, val));
370                    }
371                    Some((is, ie)) => {
372                        let len = (ie - is + 1) as usize;
373                        total_chunks += len.div_ceil(default_task_count);
374                        interval = Some((val, val));
375                    }
376                    None => {
377                        interval = Some((val, val));
378                    }
379                }
380            }
381        }
382    }
383    // Flush final interval
384    if let Some((is, ie)) = interval {
385        let len = (ie - is + 1) as usize;
386        total_chunks += len.div_ceil(default_task_count);
387    }
388    total_chunks
389}
390
391impl ContiguousChunkNode {
392    fn new(name: String, range: job::TaskParamRange<i64>, default_task_count: usize) -> Self {
393        let total_len = match &range {
394            job::TaskParamRange::List(v) => v.len(),
395            job::TaskParamRange::RangeExpr(r) => r.len(),
396        };
397        let dtc = default_task_count.max(1);
398        let num_chunks = count_contiguous_chunks_for_range(&range, dtc);
399        Self {
400            name,
401            range,
402            default_task_count: dtc,
403            num_chunks,
404            total_len,
405        }
406    }
407}
408
409impl Node for ContiguousChunkNode {
410    fn len(&self) -> usize {
411        self.num_chunks
412    }
413    fn get(&self, _index: usize, _result: &mut TaskParameterSet) {
414        // Sequential-only; use iter()
415    }
416    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
417        let v = params.get(&self.name).ok_or_else(|| {
418            format!(
419                "Parameter '{}' not found in the provided parameters.",
420                self.name
421            )
422        })?;
423        match &v.value {
424            ExprValue::RangeExpr(r) => {
425                // Check by iterating chunks
426                for chunk in ContiguousChunkIterState::new(self) {
427                    if chunk == *r {
428                        return Ok(());
429                    }
430                }
431                Err(format!(
432                    "Parameter '{}' value '{}' is not a valid chunk in the parameter space.",
433                    self.name, r
434                ))
435            }
436            _ => Err(format!(
437                "Parameter '{}' value '{}' is not in the parameter space range.",
438                self.name,
439                v.value.to_display_string()
440            )),
441        }
442    }
443    fn iter(&self) -> Box<dyn NodeIterator> {
444        Box::new(ContiguousChunkNodeIterator {
445            state: ContiguousChunkIterState::new(self),
446            name: self.name.clone(),
447        })
448    }
449}
450
451/// Reusable state for iterating contiguous chunks from a range.
452/// Finds contiguous intervals, then divides each interval evenly into chunks
453/// matching the Python `divide_int_interval_into_chunks` algorithm.
454struct ContiguousChunkIterState {
455    range: job::TaskParamRange<i64>,
456    default_task_count: usize,
457    total_len: usize,
458    cursor: usize,
459    // Current interval chunking state
460    interval_start_val: i64, // first value of current interval
461    interval_chunks_remaining: usize,
462    interval_pos: i64, // next value to emit within interval
463    interval_small: usize,
464    interval_leftovers: usize,
465    interval_chunk_index: usize,
466    interval_chunk_count: usize,
467}
468
469impl ContiguousChunkIterState {
470    fn new(node: &ContiguousChunkNode) -> Self {
471        Self {
472            range: node.range.clone(),
473            default_task_count: node.default_task_count,
474            total_len: node.total_len,
475            cursor: 0,
476            interval_start_val: 0,
477            interval_chunks_remaining: 0,
478            interval_pos: 0,
479            interval_small: 0,
480            interval_leftovers: 0,
481            interval_chunk_index: 0,
482            interval_chunk_count: 0,
483        }
484    }
485
486    fn get_value(&self, i: usize) -> i64 {
487        match &self.range {
488            job::TaskParamRange::List(v) => v[i],
489            // i is always bounded by the range length via cursor/total_len checks in callers.
490            job::TaskParamRange::RangeExpr(r) => {
491                r.get(i as i64).expect("index within range bounds")
492            }
493        }
494    }
495
496    /// Find the last index of the contiguous interval starting at `start`.
497    /// For `RangeExpr`, uses sub-range structure to skip step-1 ranges in O(R).
498    /// For `List`, scans values in O(interval_len).
499    fn find_interval_end(&self, start: usize) -> usize {
500        match &self.range {
501            job::TaskParamRange::List(v) => {
502                let mut end = start;
503                while end + 1 < v.len() && v[end + 1] == v[end] + 1 {
504                    end += 1;
505                }
506                end
507            }
508            job::TaskParamRange::RangeExpr(r) => {
509                // Use sub-ranges: find which sub-range contains `start`, then
510                // walk forward through step-1 sub-ranges that are adjacent.
511                let cumulative = r.cumulative_lengths();
512                let sub_ranges = r.ranges();
513
514                // Binary search for the sub-range containing `start`
515                let sr_idx = cumulative.partition_point(|&c| c <= start);
516                let sr_offset = if sr_idx == 0 {
517                    0
518                } else {
519                    cumulative[sr_idx - 1]
520                };
521
522                let sr = &sub_ranges[sr_idx];
523
524                if sr.step != 1 {
525                    // Step > 1: each value is isolated
526                    return start;
527                }
528
529                // Current sub-range is step-1: interval extends to end of this sub-range
530                let mut end = sr_offset + sr.len() - 1;
531
532                // Check subsequent sub-ranges for adjacency
533                let mut last_val = sr.end;
534                for next_sr in &sub_ranges[sr_idx + 1..] {
535                    if next_sr.start == last_val + 1 && next_sr.step == 1 {
536                        end += next_sr.len();
537                        last_val = next_sr.end;
538                    } else if next_sr.start == last_val + 1 && next_sr.step > 1 {
539                        // First value is adjacent, but subsequent values have gaps
540                        end += 1;
541                        break;
542                    } else {
543                        break;
544                    }
545                }
546                end
547            }
548        }
549    }
550
551    /// Advance cursor to find the next contiguous interval and set up chunking state.
552    fn start_next_interval(&mut self) -> bool {
553        if self.cursor >= self.total_len {
554            return false;
555        }
556        let first = self.get_value(self.cursor);
557
558        // Find end of contiguous interval efficiently
559        let end_idx = self.find_interval_end(self.cursor);
560        let last = self.get_value(end_idx);
561        let interval_len = (last - first + 1) as usize;
562        self.cursor = end_idx + 1;
563
564        // Compute even chunk distribution for this interval
565        let chunk_count = interval_len.div_ceil(self.default_task_count);
566        let (small, leftovers) = if chunk_count >= interval_len {
567            (1, 0)
568        } else if chunk_count <= 1 {
569            (interval_len, 0)
570        } else {
571            (interval_len / chunk_count, interval_len % chunk_count)
572        };
573
574        self.interval_start_val = first;
575        self.interval_pos = first;
576        self.interval_chunks_remaining = chunk_count;
577        self.interval_small = small;
578        self.interval_leftovers = leftovers;
579        self.interval_chunk_index = 0;
580        self.interval_chunk_count = chunk_count;
581        true
582    }
583
584    fn next_chunk(&mut self) -> Option<RangeExpr> {
585        // If no chunks remaining in current interval, find next interval
586        while self.interval_chunks_remaining == 0 {
587            if !self.start_next_interval() {
588                return None;
589            }
590        }
591
592        // Compute chunk size using Python's even distribution:
593        // chunk_sizes[(i * chunk_count) // leftovers] += 1
594        let mut size = self.interval_small;
595        if self.interval_leftovers > 0
596            && (self.interval_chunk_index * self.interval_chunk_count) / self.interval_leftovers
597                != ((self.interval_chunk_index + 1) * self.interval_chunk_count)
598                    / self.interval_leftovers
599        {
600            // This is a simpler equivalent: check if this index gets a +1
601            // by testing if floor((i+1)*count/left) > floor(i*count/left)
602        }
603        // Actually, replicate the Python algorithm directly:
604        // chunk_sizes = [small] * chunk_count
605        // for i in range(leftovers): chunk_sizes[(i * chunk_count) // leftovers] += 1
606        // Check if current chunk_index is one of the +1 slots
607        if self.interval_leftovers > 0 {
608            let idx = self.interval_chunk_index;
609            let cc = self.interval_chunk_count;
610            let lo = self.interval_leftovers;
611            // The +1 slots are at indices: (i * cc) // lo for i in 0..lo
612            // Equivalently, idx gets +1 if there exists i such that (i * cc) / lo == idx
613            // which means: idx * lo <= i * cc < (idx + 1) * lo
614            // i.e., ceil(idx * lo / cc) <= i < ceil((idx+1) * lo / cc)
615            // If that range is non-empty, this index gets +1
616            let i_start = (idx * lo).div_ceil(cc);
617            let i_end = ((idx + 1) * lo).div_ceil(cc);
618            if i_start < i_end && i_start < lo {
619                size += 1;
620            }
621        }
622
623        let start = self.interval_pos;
624        let end = start + size as i64 - 1;
625        self.interval_pos = end + 1;
626        self.interval_chunks_remaining -= 1;
627        self.interval_chunk_index += 1;
628
629        let s = format!("{start}-{end}");
630        Some(
631            s.parse::<RangeExpr>()
632                .expect("valid range")
633                .with_contiguous(true),
634        )
635    }
636}
637
638impl Iterator for ContiguousChunkIterState {
639    type Item = RangeExpr;
640    fn next(&mut self) -> Option<RangeExpr> {
641        self.next_chunk()
642    }
643}
644
645/// NodeIterator wrapper for ContiguousChunkNode.
646struct ContiguousChunkNodeIterator {
647    state: ContiguousChunkIterState,
648    name: String,
649}
650
651impl NodeIterator for ContiguousChunkNodeIterator {
652    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
653        match self.state.next_chunk() {
654            Some(expr) => {
655                result.insert(
656                    self.name.clone(),
657                    TaskParameterValue {
658                        param_type: TaskParameterType::ChunkInt,
659                        value: ExprValue::RangeExpr(expr),
660                    },
661                );
662                true
663            }
664            None => false,
665        }
666    }
667    fn reset(&mut self) {
668        self.state.cursor = 0;
669        self.state.interval_chunks_remaining = 0;
670    }
671}
672
673/// Zero-dimensional space: produces one empty parameter set.
674struct ZeroDimSpaceNode;
675
676impl Node for ZeroDimSpaceNode {
677    fn len(&self) -> usize {
678        1
679    }
680    fn get(&self, _index: usize, _result: &mut TaskParameterSet) {}
681    fn validate_containment(&self, _params: &TaskParameterSet) -> Result<(), String> {
682        Ok(())
683    }
684    fn iter(&self) -> Box<dyn NodeIterator> {
685        Box::new(IndexedNodeIterator { len: 1, index: 0 })
686    }
687}
688
689/// Wraps a parameter name + pre-materialized list of values.
690struct RangeListNode {
691    name: String,
692    param_type: TaskParameterType,
693    values: Vec<ExprValue>,
694}
695
696impl Node for RangeListNode {
697    fn len(&self) -> usize {
698        self.values.len()
699    }
700    fn get(&self, index: usize, result: &mut TaskParameterSet) {
701        result.insert(
702            self.name.clone(),
703            TaskParameterValue {
704                param_type: self.param_type,
705                value: self.values[index].clone(),
706            },
707        );
708    }
709    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
710        let v = params.get(&self.name).ok_or_else(|| {
711            format!(
712                "Parameter '{}' not found in the provided parameters.",
713                self.name
714            )
715        })?;
716        if self.param_type == TaskParameterType::ChunkInt {
717            // Chunk: value must be a RangeExpr whose elements are all in our range
718            match &v.value {
719                ExprValue::RangeExpr(r) => {
720                    for val in r.iter() {
721                        if !self
722                            .values
723                            .iter()
724                            .any(|ev| matches!(ev, ExprValue::Int(i) if *i == val))
725                        {
726                            return Err(format!(
727                                "Parameter '{}' value '{}' is not a subset of the range in the parameter space.",
728                                self.name, r
729                            ));
730                        }
731                    }
732                    Ok(())
733                }
734                _ => Err(format!(
735                    "Parameter '{}' value '{}' is not in the parameter space range.",
736                    self.name,
737                    v.value.to_display_string()
738                )),
739            }
740        } else if !self.values.iter().any(|ev| expr_value_eq(ev, &v.value)) {
741            Err(format!(
742                "Parameter '{}' value '{}' is not in the parameter space range.",
743                self.name,
744                v.value.to_display_string()
745            ))
746        } else {
747            Ok(())
748        }
749    }
750    fn iter(&self) -> Box<dyn NodeIterator> {
751        Box::new(RangeListIterator {
752            name: self.name.clone(),
753            param_type: self.param_type,
754            values: self.values.clone(),
755            index: 0,
756        })
757    }
758}
759
760/// Wraps a parameter name + `RangeExpr`; computes values on demand.
761struct RangeExprNode {
762    name: String,
763    range: RangeExpr,
764}
765
766impl Node for RangeExprNode {
767    fn len(&self) -> usize {
768        self.range.len()
769    }
770    fn get(&self, index: usize, result: &mut TaskParameterSet) {
771        let val = self
772            .range
773            .get(index as i64)
774            .expect("caller must pass index < self.range.len()");
775        result.insert(
776            self.name.clone(),
777            TaskParameterValue {
778                param_type: TaskParameterType::Int,
779                value: ExprValue::Int(val),
780            },
781        );
782    }
783    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
784        let v = params.get(&self.name).ok_or_else(|| {
785            format!(
786                "Parameter '{}' not found in the provided parameters.",
787                self.name
788            )
789        })?;
790        match &v.value {
791            ExprValue::Int(i) => {
792                if self.range.contains(*i) {
793                    Ok(())
794                } else {
795                    Err(format!(
796                        "Parameter '{}' value '{}' is not in the parameter space range.",
797                        self.name, i
798                    ))
799                }
800            }
801            _ => Err(format!(
802                "Parameter '{}' value '{}' is not in the parameter space range.",
803                self.name,
804                v.value.to_display_string()
805            )),
806        }
807    }
808    fn iter(&self) -> Box<dyn NodeIterator> {
809        Box::new(RangeExprIterator {
810            name: self.name.clone(),
811            range: self.range.clone(),
812            index: 0,
813        })
814    }
815}
816
817/// Wraps a parameter name + pre-computed chunk `RangeExpr`s.
818struct StaticChunkNode {
819    name: String,
820    range: job::TaskParamRange<i64>,
821    constraint: RangeConstraint,
822    num_chunks: usize,
823    small: usize,     // base chunk size = total / num_chunks
824    leftovers: usize, // first `leftovers` chunks get size small+1
825}
826
827impl StaticChunkNode {
828    /// Build a RangeExpr for chunk `i` on the fly.
829    fn chunk_range_expr(&self, i: usize) -> RangeExpr {
830        build_chunk_range_expr(&self.range, &self.constraint, self.small, self.leftovers, i)
831    }
832}
833
834impl Node for StaticChunkNode {
835    fn len(&self) -> usize {
836        self.num_chunks
837    }
838    fn get(&self, index: usize, result: &mut TaskParameterSet) {
839        result.insert(
840            self.name.clone(),
841            TaskParameterValue {
842                param_type: TaskParameterType::ChunkInt,
843                value: ExprValue::RangeExpr(self.chunk_range_expr(index)),
844            },
845        );
846    }
847    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
848        let v = params.get(&self.name).ok_or_else(|| {
849            format!(
850                "Parameter '{}' not found in the provided parameters.",
851                self.name
852            )
853        })?;
854        match &v.value {
855            ExprValue::RangeExpr(r) => {
856                if (0..self.num_chunks).any(|i| self.chunk_range_expr(i) == *r) {
857                    Ok(())
858                } else {
859                    Err(format!(
860                        "Parameter '{}' value '{}' is not a valid chunk in the parameter space.",
861                        self.name, r
862                    ))
863                }
864            }
865            _ => Err(format!(
866                "Parameter '{}' value '{}' is not in the parameter space range.",
867                self.name,
868                v.value.to_display_string()
869            )),
870        }
871    }
872    fn iter(&self) -> Box<dyn NodeIterator> {
873        Box::new(StaticChunkIterator {
874            name: self.name.clone(),
875            range: self.range.clone(),
876            constraint: self.constraint.clone(),
877            num_chunks: self.num_chunks,
878            small: self.small,
879            leftovers: self.leftovers,
880            index: 0,
881        })
882    }
883}
884
885/// Cartesian product of children (rightmost moves fastest).
886struct ProductNode {
887    children: Vec<Box<dyn Node>>,
888    length: usize,
889}
890
891impl Node for ProductNode {
892    fn len(&self) -> usize {
893        self.length
894    }
895    fn get(&self, mut index: usize, result: &mut TaskParameterSet) {
896        for child in self.children.iter().rev() {
897            let child_len = child.len();
898            child.get(index % child_len, result);
899            index /= child_len;
900        }
901    }
902    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
903        for child in &self.children {
904            child.validate_containment(params)?;
905        }
906        Ok(())
907    }
908    fn iter(&self) -> Box<dyn NodeIterator> {
909        Box::new(ProductIterator::new(&self.children))
910    }
911}
912
913/// Iterator for ProductNode that composes child iterators.
914/// Non-adaptive children cycle through their values (rightmost fastest);
915/// the adaptive child (if any) advances when non-adaptive children wrap.
916struct ProductIterator {
917    children: Vec<ChildIterator>,
918    started: bool,
919}
920
921struct ChildIterator {
922    iter: Box<dyn NodeIterator>,
923    current: TaskParameterSet,
924}
925
926impl ProductIterator {
927    fn new(children: &[Box<dyn Node>]) -> Self {
928        let children = children
929            .iter()
930            .map(|child| ChildIterator {
931                iter: child.iter(),
932                current: TaskParameterSet::new(),
933            })
934            .collect();
935        Self {
936            children,
937            started: false,
938        }
939    }
940
941    /// Advance the first value from each child. Returns false if any child is empty.
942    fn initialize(&mut self) -> bool {
943        for child in &mut self.children {
944            if !child.iter.next(&mut child.current) {
945                return false;
946            }
947        }
948        true
949    }
950}
951
952impl NodeIterator for ProductIterator {
953    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
954        if !self.started {
955            self.started = true;
956            if !self.initialize() {
957                return false;
958            }
959        } else {
960            // Advance rightmost, carry left
961            let mut carry = true;
962            for child in self.children.iter_mut().rev() {
963                if !carry {
964                    break;
965                }
966                child.current.clear();
967                if child.iter.next(&mut child.current) {
968                    carry = false;
969                } else {
970                    // Exhausted — reset and advance to first value, carry continues
971                    child.iter.reset();
972                    if !child.iter.next(&mut child.current) {
973                        return false;
974                    }
975                }
976            }
977            if carry {
978                return false;
979            }
980        }
981        for child in &self.children {
982            result.extend(child.current.iter().map(|(k, v)| (k.clone(), v.clone())));
983        }
984        true
985    }
986    fn reset(&mut self) {
987        self.started = false;
988        for child in &mut self.children {
989            child.iter.reset();
990            child.current.clear();
991        }
992    }
993}
994
995/// Association: all children have the same length, indexed in lockstep.
996struct AssociationNode {
997    children: Vec<Box<dyn Node>>,
998    length: usize,
999}
1000
1001impl Node for AssociationNode {
1002    fn len(&self) -> usize {
1003        self.length
1004    }
1005    fn get(&self, index: usize, result: &mut TaskParameterSet) {
1006        for child in &self.children {
1007            child.get(index, result);
1008        }
1009    }
1010    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
1011        // Linear scan: at least one index must match all children simultaneously
1012        for i in 0..self.length {
1013            let mut candidate = TaskParameterSet::new();
1014            for child in &self.children {
1015                child.get(i, &mut candidate);
1016            }
1017            if params_equal(&candidate, params) {
1018                return Ok(());
1019            }
1020        }
1021        // Build a display of the mismatched values
1022        let values: Vec<String> = params
1023            .iter()
1024            .filter(|(k, _)| {
1025                self.children.iter().any(|c| {
1026                    let mut ps = TaskParameterSet::new();
1027                    c.get(0, &mut ps);
1028                    ps.contains_key(*k)
1029                })
1030            })
1031            .map(|(k, v)| format!("{}={}", k, v.value.to_display_string()))
1032            .collect();
1033        Err(format!(
1034            "The values {{{}}}, of an association expression in the combination expression, do not appear in the parameter space.",
1035            values.join(", ")
1036        ))
1037    }
1038    fn iter(&self) -> Box<dyn NodeIterator> {
1039        Box::new(AssociationIterator::new(&self.children))
1040    }
1041}
1042
1043/// Iterator for AssociationNode: lockstep iteration of children.
1044struct AssociationIterator {
1045    children: Vec<ChildIterator>,
1046}
1047
1048impl AssociationIterator {
1049    fn new(children: &[Box<dyn Node>]) -> Self {
1050        let children = children
1051            .iter()
1052            .map(|child| ChildIterator {
1053                iter: child.iter(),
1054                current: TaskParameterSet::new(),
1055            })
1056            .collect();
1057        Self { children }
1058    }
1059}
1060
1061impl NodeIterator for AssociationIterator {
1062    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
1063        for child in &mut self.children {
1064            child.current.clear();
1065            if !child.iter.next(&mut child.current) {
1066                return false;
1067            }
1068            result.extend(child.current.iter().map(|(k, v)| (k.clone(), v.clone())));
1069        }
1070        true
1071    }
1072    fn reset(&mut self) {
1073        for child in &mut self.children {
1074            child.iter.reset();
1075            child.current.clear();
1076        }
1077    }
1078}
1079
1080/// Adaptive chunk node: produces chunks on the fly based on mutable `default_task_count`.
1081struct AdaptiveChunkNode {
1082    name: String,
1083    values: Vec<i64>,
1084    default_task_count: Arc<AtomicUsize>,
1085    range_constraint: RangeConstraint,
1086}
1087
1088impl Node for AdaptiveChunkNode {
1089    fn len(&self) -> usize {
1090        // Upper bound: one chunk per value. Actual count depends on runtime chunk size.
1091        // Used only for association length validation during construction.
1092        let dtc = self.default_task_count.load(Ordering::Relaxed).max(1);
1093        self.values.len().div_ceil(dtc)
1094    }
1095    fn get(&self, _index: usize, _result: &mut TaskParameterSet) {
1096        // Random access not supported — use iter() instead.
1097    }
1098    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
1099        let v = params.get(&self.name).ok_or_else(|| {
1100            format!(
1101                "Parameter '{}' not found in the provided parameters.",
1102                self.name
1103            )
1104        })?;
1105        match &v.value {
1106            ExprValue::RangeExpr(r) => {
1107                let valid: HashSet<i64> = self.values.iter().copied().collect();
1108                for val in r.iter() {
1109                    if !valid.contains(&val) {
1110                        return Err(format!(
1111                            "Parameter '{}' value '{}' is not a subset of the range in the parameter space.",
1112                            self.name, r
1113                        ));
1114                    }
1115                }
1116                Ok(())
1117            }
1118            _ => Err(format!(
1119                "Parameter '{}' value '{}' is not in the parameter space range.",
1120                self.name,
1121                v.value.to_display_string()
1122            )),
1123        }
1124    }
1125    fn iter(&self) -> Box<dyn NodeIterator> {
1126        Box::new(AdaptiveChunkIterator {
1127            name: self.name.clone(),
1128            values: self.values.clone(),
1129            default_task_count: self.default_task_count.clone(),
1130            range_constraint: self.range_constraint.clone(),
1131            cursor: 0,
1132        })
1133    }
1134}
1135
1136/// Iterator for adaptive chunk nodes.
1137struct AdaptiveChunkIterator {
1138    name: String,
1139    values: Vec<i64>,
1140    default_task_count: Arc<AtomicUsize>,
1141    range_constraint: RangeConstraint,
1142    cursor: usize,
1143}
1144
1145impl AdaptiveChunkIterator {
1146    fn make_chunk(&self, slice: &[i64]) -> RangeExpr {
1147        let range_str = match self.range_constraint {
1148            RangeConstraint::Contiguous => {
1149                if slice.len() == 1 {
1150                    slice[0].to_string()
1151                } else {
1152                    format!("{}-{}", slice[0], slice[slice.len() - 1])
1153                }
1154            }
1155            RangeConstraint::Noncontiguous => compress_range_expr(slice),
1156        };
1157        let expr = range_str
1158            .parse::<RangeExpr>()
1159            .expect("range string built from valid integers");
1160        match self.range_constraint {
1161            RangeConstraint::Contiguous => expr.with_contiguous(true),
1162            RangeConstraint::Noncontiguous => expr,
1163        }
1164    }
1165}
1166
1167impl NodeIterator for AdaptiveChunkIterator {
1168    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
1169        if self.cursor >= self.values.len() {
1170            return false;
1171        }
1172        let chunk_size = self.default_task_count.load(Ordering::Relaxed).max(1);
1173        let chunk = match self.range_constraint {
1174            RangeConstraint::Contiguous => {
1175                let start = self.cursor;
1176                let mut end = start + 1;
1177                while end < self.values.len()
1178                    && end - start < chunk_size
1179                    && self.values[end] == self.values[end - 1] + 1
1180                {
1181                    end += 1;
1182                }
1183                let slice = &self.values[start..end];
1184                self.cursor = end;
1185                self.make_chunk(slice)
1186            }
1187            RangeConstraint::Noncontiguous => {
1188                let end = (self.cursor + chunk_size).min(self.values.len());
1189                let slice = &self.values[self.cursor..end];
1190                self.cursor = end;
1191                self.make_chunk(slice)
1192            }
1193        };
1194        result.insert(
1195            self.name.clone(),
1196            TaskParameterValue {
1197                param_type: TaskParameterType::ChunkInt,
1198                value: ExprValue::RangeExpr(chunk),
1199            },
1200        );
1201        true
1202    }
1203    fn reset(&mut self) {
1204        self.cursor = 0;
1205    }
1206}
1207
1208// ── Public API ──
1209
1210/// Lazy iterator over a resolved step parameter space.
1211pub struct StepParameterSpaceIterator {
1212    root: Box<dyn Node>,
1213    names: HashSet<String>,
1214    current_index: usize,
1215    adaptive: bool,
1216    adaptive_chunk_size: Option<Arc<AtomicUsize>>,
1217    node_iter: Option<Box<dyn NodeIterator>>,
1218    chunks_param_name: Option<String>,
1219    /// True when iteration must be sequential (adaptive or contiguous chunking).
1220    sequential: bool,
1221}
1222
1223impl StepParameterSpaceIterator {
1224    /// Construct from a resolved `StepParameterSpace`.
1225    pub fn new(space: &job::StepParameterSpace) -> Result<Self, ModelError> {
1226        Self::new_inner(space, None)
1227    }
1228
1229    /// Create with an explicit chunk task count override.
1230    /// When `Some(1)`, disables adaptive chunking and counts individual tasks.
1231    pub fn new_with_chunk_override(
1232        space: &job::StepParameterSpace,
1233        override_count: Option<usize>,
1234    ) -> Result<Self, ModelError> {
1235        Self::new_inner(space, override_count)
1236    }
1237
1238    fn new_inner(
1239        space: &job::StepParameterSpace,
1240        chunk_override: Option<usize>,
1241    ) -> Result<Self, ModelError> {
1242        let names: HashSet<String> = space.task_parameter_definitions.keys().cloned().collect();
1243
1244        if space.task_parameter_definitions.is_empty() {
1245            return Ok(Self {
1246                root: Box::new(ZeroDimSpaceNode),
1247                names,
1248                current_index: 0,
1249                adaptive: false,
1250                adaptive_chunk_size: None,
1251                node_iter: None,
1252                chunks_param_name: None,
1253                sequential: false,
1254            });
1255        }
1256
1257        let expr = space.combination.as_deref().unwrap_or("*");
1258
1259        // Check if any parameter needs adaptive chunking
1260        let mut adaptive_info: Option<(String, Arc<AtomicUsize>)> = None;
1261        if chunk_override.is_none() {
1262            for (name, param) in &space.task_parameter_definitions {
1263                if let job::TaskParameter::ChunkInt { chunks, .. } = param {
1264                    if chunks.target_runtime_seconds.is_some_and(|t| t > 0) {
1265                        let arc = Arc::new(AtomicUsize::new(chunks.default_task_count.max(1)));
1266                        adaptive_info = Some((name.clone(), arc));
1267                        break;
1268                    }
1269                }
1270            }
1271        }
1272
1273        let root = if expr.trim() == "*" {
1274            // Default: no explicit combination — product of all params in definition order
1275            let mut children: Vec<Box<dyn Node>> = Vec::new();
1276            let mut adaptive_idx = None;
1277            for (i, name) in space.task_parameter_definitions.keys().enumerate() {
1278                if adaptive_info.as_ref().is_some_and(|(n, _)| n == name) {
1279                    adaptive_idx = Some(i);
1280                }
1281                children.push(make_leaf_node(name, space, &adaptive_info, chunk_override)?);
1282            }
1283            // Move adaptive child to the end (innermost/fastest-varying) to match Python
1284            if let Some(idx) = adaptive_idx {
1285                let child = children.remove(idx);
1286                children.push(child);
1287            }
1288            if children.len() == 1 {
1289                // SAFETY: We just checked len() == 1, so into_iter().next() always
1290                // returns Some. Using into_iter avoids an unwrap on pop().
1291                children
1292                    .into_iter()
1293                    .next()
1294                    .expect("non-empty vec with len 1")
1295            } else {
1296                let length = checked_product_len(&children)?;
1297                Box::new(ProductNode { children, length })
1298            }
1299        } else {
1300            let tokens = tokenize(expr);
1301            parse_node_expr(&tokens, space, &adaptive_info, chunk_override)?
1302        };
1303
1304        let adaptive = adaptive_info.is_some();
1305        let chunks_param_name = adaptive_info.as_ref().map(|(n, _)| n.clone());
1306        let adaptive_chunk_size = adaptive_info.map(|(_, rc)| rc);
1307
1308        // Use iterator path if any node requires sequential iteration
1309        // (adaptive chunking or contiguous chunking with gaps)
1310        let needs_sequential = adaptive || has_contiguous_chunks(space);
1311        let node_iter = if needs_sequential {
1312            Some(root.iter())
1313        } else {
1314            None
1315        };
1316
1317        Ok(Self {
1318            root,
1319            names,
1320            current_index: 0,
1321            adaptive,
1322            adaptive_chunk_size,
1323            node_iter,
1324            chunks_param_name,
1325            sequential: needs_sequential,
1326        })
1327    }
1328
1329    pub fn names(&self) -> &HashSet<String> {
1330        &self.names
1331    }
1332
1333    pub fn len(&self) -> usize {
1334        if self.adaptive {
1335            0
1336        } else {
1337            self.root.len()
1338        }
1339    }
1340
1341    pub fn is_empty(&self) -> bool {
1342        if self.adaptive {
1343            false
1344        } else {
1345            self.root.len() == 0
1346        }
1347    }
1348
1349    /// Random access to a specific task parameter set by index.
1350    /// Returns `None` for out-of-bounds or when sequential iteration is required.
1351    pub fn get(&self, index: usize) -> Option<TaskParameterSet> {
1352        if self.sequential {
1353            return None;
1354        }
1355        if index >= self.root.len() {
1356            return None;
1357        }
1358        let mut result = TaskParameterSet::new();
1359        self.root.get(index, &mut result);
1360        Some(result)
1361    }
1362
1363    /// Check if a parameter set is contained in this space.
1364    pub fn contains(&self, params: &TaskParameterSet) -> bool {
1365        self.validate_containment(params).is_ok()
1366    }
1367
1368    /// Validate that a parameter set is contained in this space.
1369    /// Returns a detailed error message if not.
1370    pub fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
1371        let mut params_keys: Vec<&str> = params.keys().map(|s| s.as_str()).collect();
1372        let mut space_keys: Vec<&str> = self.names.iter().map(|s| s.as_str()).collect();
1373        params_keys.sort();
1374        space_keys.sort();
1375        if params_keys != space_keys {
1376            return Err(format!(
1377                "Task parameter names {:?} do not match the parameter space names {:?}.",
1378                params_keys, space_keys
1379            ));
1380        }
1381        self.root.validate_containment(params)
1382    }
1383
1384    /// Whether adaptive chunking is active.
1385    pub fn chunks_adaptive(&self) -> bool {
1386        self.adaptive
1387    }
1388
1389    /// The parameter name used for chunking, if any.
1390    pub fn chunks_parameter_name(&self) -> Option<&str> {
1391        self.chunks_param_name.as_deref()
1392    }
1393
1394    /// Current default_task_count for adaptive chunking.
1395    pub fn chunks_default_task_count(&self) -> Option<usize> {
1396        self.adaptive_chunk_size
1397            .as_ref()
1398            .map(|a| a.load(Ordering::Relaxed))
1399    }
1400
1401    /// Update the chunk size for adaptive chunking.
1402    pub fn set_chunks_default_task_count(&mut self, value: usize) {
1403        if let Some(ref a) = self.adaptive_chunk_size {
1404            a.store(value, Ordering::Relaxed);
1405            // The Arc<AtomicUsize> propagates to the live iterator — no reset needed.
1406        }
1407    }
1408}
1409
1410fn params_equal(a: &TaskParameterSet, b: &TaskParameterSet) -> bool {
1411    if a.len() != b.len() {
1412        return false;
1413    }
1414    a.iter().all(|(k, v)| {
1415        b.get(k)
1416            .is_some_and(|bv| expr_value_eq(&v.value, &bv.value))
1417    })
1418}
1419
1420fn expr_value_eq(a: &ExprValue, b: &ExprValue) -> bool {
1421    match (a, b) {
1422        (ExprValue::Int(x), ExprValue::Int(y)) => x == y,
1423        (ExprValue::Float(x), ExprValue::Float(y)) => x.value() == y.value(),
1424        (ExprValue::String(x), ExprValue::String(y)) => x == y,
1425        (ExprValue::RangeExpr(x), ExprValue::RangeExpr(y)) => x == y,
1426        (ExprValue::Path { value: x, .. }, ExprValue::Path { value: y, .. }) => x == y,
1427        (ExprValue::String(x), ExprValue::Path { value: y, .. }) => x == y,
1428        (ExprValue::Path { value: x, .. }, ExprValue::String(y)) => x == y,
1429        _ => false,
1430    }
1431}
1432
1433impl Iterator for StepParameterSpaceIterator {
1434    type Item = TaskParameterSet;
1435    fn next(&mut self) -> Option<TaskParameterSet> {
1436        if self.sequential {
1437            let iter = self.node_iter.as_mut()?;
1438            let mut result = TaskParameterSet::new();
1439            if iter.next(&mut result) {
1440                Some(result)
1441            } else {
1442                None
1443            }
1444        } else {
1445            let item = self.get(self.current_index)?;
1446            self.current_index += 1;
1447            Some(item)
1448        }
1449    }
1450
1451    fn size_hint(&self) -> (usize, Option<usize>) {
1452        if self.adaptive {
1453            (0, None)
1454        } else {
1455            let remaining = self.root.len().saturating_sub(self.current_index);
1456            (remaining, Some(remaining))
1457        }
1458    }
1459}
1460
1461// ── Node construction from combination expression ──
1462
1463fn parse_node_expr(
1464    tokens: &[String],
1465    space: &job::StepParameterSpace,
1466    adaptive_info: &Option<(String, Arc<AtomicUsize>)>,
1467    chunk_override: Option<usize>,
1468) -> Result<Box<dyn Node>, ModelError> {
1469    let mut pos = 0;
1470    let result = parse_node_product(tokens, &mut pos, space, adaptive_info, chunk_override)?;
1471    if pos < tokens.len() {
1472        return Err(ModelError::DecodeValidation(format!(
1473            "Unexpected token '{}' in combination expression",
1474            tokens[pos]
1475        )));
1476    }
1477    Ok(result)
1478}
1479
1480fn parse_node_product(
1481    tokens: &[String],
1482    pos: &mut usize,
1483    space: &job::StepParameterSpace,
1484    adaptive_info: &Option<(String, Arc<AtomicUsize>)>,
1485    chunk_override: Option<usize>,
1486) -> Result<Box<dyn Node>, ModelError> {
1487    let mut children = vec![parse_node_element(
1488        tokens,
1489        pos,
1490        space,
1491        adaptive_info,
1492        chunk_override,
1493    )?];
1494    while *pos < tokens.len() && tokens[*pos] == "*" {
1495        *pos += 1;
1496        children.push(parse_node_element(
1497            tokens,
1498            pos,
1499            space,
1500            adaptive_info,
1501            chunk_override,
1502        )?);
1503    }
1504    if children.len() == 1 {
1505        // SAFETY: We just checked len() == 1, so into_iter().next() always
1506        // returns Some. Using into_iter avoids an unwrap on pop().
1507        Ok(children
1508            .into_iter()
1509            .next()
1510            .expect("non-empty vec with len 1"))
1511    } else {
1512        let length = checked_product_len(&children)?;
1513        Ok(Box::new(ProductNode { children, length }))
1514    }
1515}
1516
1517fn parse_node_element(
1518    tokens: &[String],
1519    pos: &mut usize,
1520    space: &job::StepParameterSpace,
1521    adaptive_info: &Option<(String, Arc<AtomicUsize>)>,
1522    chunk_override: Option<usize>,
1523) -> Result<Box<dyn Node>, ModelError> {
1524    if *pos >= tokens.len() {
1525        return Err(ModelError::DecodeValidation(
1526            "Unexpected end of combination expression".into(),
1527        ));
1528    }
1529    if tokens[*pos] == "(" {
1530        *pos += 1;
1531        let mut children = vec![parse_node_product(
1532            tokens,
1533            pos,
1534            space,
1535            adaptive_info,
1536            chunk_override,
1537        )?];
1538        while *pos < tokens.len() && tokens[*pos] == "," {
1539            *pos += 1;
1540            children.push(parse_node_product(
1541                tokens,
1542                pos,
1543                space,
1544                adaptive_info,
1545                chunk_override,
1546            )?);
1547        }
1548        if *pos >= tokens.len() || tokens[*pos] != ")" {
1549            return Err(ModelError::DecodeValidation(
1550                "Missing closing parenthesis in combination".into(),
1551            ));
1552        }
1553        *pos += 1;
1554        let length = children[0].len();
1555        for child in children.iter().skip(1) {
1556            if child.len() != length {
1557                return Err(ModelError::DecodeValidation(format!(
1558                    "Associative combination: all members must have the same number of values, got {} and {}",
1559                    length, child.len()
1560                )));
1561            }
1562        }
1563        if children.len() == 1 {
1564            Err(ModelError::DecodeValidation(
1565                "Association expression must have more than one term.".into(),
1566            ))
1567        } else {
1568            Ok(Box::new(AssociationNode { children, length }))
1569        }
1570    } else {
1571        let name = &tokens[*pos];
1572        *pos += 1;
1573        make_leaf_node(name, space, adaptive_info, chunk_override)
1574    }
1575}
1576
1577/// Create a leaf node for a parameter name from the resolved definitions.
1578fn make_leaf_node(
1579    name: &str,
1580    space: &job::StepParameterSpace,
1581    adaptive_info: &Option<(String, Arc<AtomicUsize>)>,
1582    chunk_override: Option<usize>,
1583) -> Result<Box<dyn Node>, ModelError> {
1584    let param = space.task_parameter_definitions.get(name).ok_or_else(|| {
1585        ModelError::DecodeValidation(format!(
1586            "Unknown parameter '{name}' in combination expression"
1587        ))
1588    })?;
1589
1590    match param {
1591        job::TaskParameter::Int { range, chunks } => {
1592            if let Some(chunk_cfg) = chunks {
1593                return make_chunk_node(name, range, chunk_cfg, adaptive_info, chunk_override);
1594            }
1595            match range {
1596                job::TaskParamRange::List(v) => Ok(Box::new(RangeListNode {
1597                    name: name.to_string(),
1598                    param_type: TaskParameterType::Int,
1599                    values: v.iter().map(|&i| ExprValue::Int(i)).collect(),
1600                })),
1601                job::TaskParamRange::RangeExpr(r) => Ok(Box::new(RangeExprNode {
1602                    name: name.to_string(),
1603                    range: r.clone(),
1604                })),
1605            }
1606        }
1607        job::TaskParameter::Float { range } => Ok(Box::new(RangeListNode {
1608            name: name.to_string(),
1609            param_type: TaskParameterType::Float,
1610            values: range
1611                .iter()
1612                .map(|&f| {
1613                    Float64::new(f).map(ExprValue::Float).map_err(|_| {
1614                        ModelError::DecodeValidation(format!(
1615                            "Parameter '{name}': float value {f} is not finite"
1616                        ))
1617                    })
1618                })
1619                .collect::<Result<Vec<_>, _>>()?,
1620        })),
1621        job::TaskParameter::String { range } => Ok(Box::new(RangeListNode {
1622            name: name.to_string(),
1623            param_type: TaskParameterType::String,
1624            values: range.iter().map(|s| ExprValue::String(s.clone())).collect(),
1625        })),
1626        job::TaskParameter::Path { range } => Ok(Box::new(RangeListNode {
1627            name: name.to_string(),
1628            param_type: TaskParameterType::Path,
1629            values: range.iter().map(|s| ExprValue::String(s.clone())).collect(),
1630        })),
1631        job::TaskParameter::ChunkInt { range, chunks } => {
1632            make_chunk_node(name, range, chunks, adaptive_info, chunk_override)
1633        }
1634    }
1635}
1636
1637/// Check if any chunk parameter uses contiguous constraint (requires sequential iteration).
1638fn has_contiguous_chunks(space: &job::StepParameterSpace) -> bool {
1639    space.task_parameter_definitions.values().any(|p| {
1640        matches!(
1641            p,
1642            job::TaskParameter::ChunkInt { chunks, .. }
1643                if chunks.range_constraint == RangeConstraint::Contiguous
1644        )
1645    })
1646}
1647
1648/// Build a chunk node from a range and chunk config. Creates `AdaptiveChunkNode` when
1649/// `target_runtime_seconds > 0`, `ContiguousChunkNode` for contiguous static chunking,
1650/// or `StaticChunkNode` for noncontiguous static chunking.
1651fn make_chunk_node(
1652    name: &str,
1653    range: &job::TaskParamRange<i64>,
1654    chunks: &job::ResolvedChunks,
1655    adaptive_info: &Option<(String, Arc<AtomicUsize>)>,
1656    chunk_override: Option<usize>,
1657) -> Result<Box<dyn Node>, ModelError> {
1658    // Check if this parameter should use adaptive chunking
1659    if let Some((adaptive_name, rc)) = adaptive_info {
1660        if adaptive_name == name {
1661            let values: Vec<i64> = match range {
1662                job::TaskParamRange::List(v) => v.clone(),
1663                job::TaskParamRange::RangeExpr(r) => r.iter().collect(),
1664            };
1665            return Ok(Box::new(AdaptiveChunkNode {
1666                name: name.to_string(),
1667                values,
1668                default_task_count: rc.clone(),
1669                range_constraint: chunks.range_constraint.clone(),
1670            }));
1671        }
1672    }
1673
1674    // Use override if provided, otherwise use the template's default
1675    let default_task_count = chunk_override.unwrap_or(chunks.default_task_count).max(1);
1676
1677    let total_len = match range {
1678        job::TaskParamRange::List(v) => v.len(),
1679        job::TaskParamRange::RangeExpr(r) => r.len(),
1680    };
1681    if total_len == 0 {
1682        return Ok(Box::new(RangeListNode {
1683            name: name.to_string(),
1684            param_type: TaskParameterType::ChunkInt,
1685            values: Vec::new(),
1686        }));
1687    }
1688
1689    // Contiguous chunking must respect gaps in the source range
1690    if chunks.range_constraint == RangeConstraint::Contiguous {
1691        return Ok(Box::new(ContiguousChunkNode::new(
1692            name.to_string(),
1693            range.clone(),
1694            default_task_count,
1695        )));
1696    }
1697
1698    let chunk_count = total_len.div_ceil(default_task_count);
1699    let small = total_len / chunk_count;
1700    let leftovers = total_len % chunk_count;
1701
1702    Ok(Box::new(StaticChunkNode {
1703        name: name.to_string(),
1704        range: range.clone(),
1705        constraint: chunks.range_constraint.clone(),
1706        num_chunks: chunk_count,
1707        small,
1708        leftovers,
1709    }))
1710}
1711
1712#[cfg(test)]
1713mod tests {
1714    use super::*;
1715
1716    #[test]
1717    fn test_compress_range_expr() {
1718        assert_eq!(compress_range_expr(&[1, 2, 3]), "1-3");
1719        assert_eq!(compress_range_expr(&[1, 2, 3, 5, 7, 8, 9]), "1-3,5,7-9");
1720        assert_eq!(compress_range_expr(&[1]), "1");
1721        assert_eq!(compress_range_expr(&[1, 3]), "1,3");
1722        assert_eq!(compress_range_expr(&[]), "");
1723    }
1724
1725    #[test]
1726    fn test_tokenize() {
1727        assert_eq!(tokenize("A * B"), vec!["A", "*", "B"]);
1728        assert_eq!(
1729            tokenize("(A, B) * C"),
1730            vec!["(", "A", ",", "B", ")", "*", "C"]
1731        );
1732        assert_eq!(tokenize("A"), vec!["A"]);
1733    }
1734
1735    // ── Helper to build test spaces ──
1736
1737    fn make_space(
1738        params: Vec<(&str, job::TaskParameter)>,
1739        combination: Option<&str>,
1740    ) -> job::StepParameterSpace {
1741        let mut defs = indexmap::IndexMap::new();
1742        for (name, param) in params {
1743            defs.insert(name.to_string(), param);
1744        }
1745        job::StepParameterSpace {
1746            task_parameter_definitions: defs,
1747            combination: combination.map(|s| s.to_string()),
1748        }
1749    }
1750
1751    fn int_param(values: Vec<i64>) -> job::TaskParameter {
1752        job::TaskParameter::Int {
1753            range: job::TaskParamRange::List(values),
1754            chunks: None,
1755        }
1756    }
1757
1758    fn adaptive_chunk_param(values: Vec<i64>, default_task_count: usize) -> job::TaskParameter {
1759        job::TaskParameter::ChunkInt {
1760            range: job::TaskParamRange::List(values),
1761            chunks: job::ResolvedChunks {
1762                default_task_count,
1763                target_runtime_seconds: Some(60), // >0 triggers adaptive
1764                range_constraint: RangeConstraint::Noncontiguous,
1765            },
1766        }
1767    }
1768
1769    fn range_expr_param(expr: &str) -> job::TaskParameter {
1770        job::TaskParameter::Int {
1771            range: job::TaskParamRange::RangeExpr(expr.parse::<RangeExpr>().unwrap()),
1772            chunks: None,
1773        }
1774    }
1775
1776    fn static_chunk_param(expr: &str, default_task_count: usize) -> job::TaskParameter {
1777        job::TaskParameter::ChunkInt {
1778            range: job::TaskParamRange::RangeExpr(expr.parse::<RangeExpr>().unwrap()),
1779            chunks: job::ResolvedChunks {
1780                default_task_count,
1781                target_runtime_seconds: None,
1782                range_constraint: RangeConstraint::Contiguous,
1783            },
1784        }
1785    }
1786
1787    // ── Laziness tests ──
1788    // These use a 100-billion-element RangeExpr. If any code path eagerly
1789    // materializes the range, the test will OOM or hang — proving non-laziness.
1790
1791    const HUGE_RANGE: &str = "1-100000000000";
1792
1793    #[test]
1794    fn test_lazy_construction_range_expr() {
1795        let space = make_space(vec![("X", range_expr_param(HUGE_RANGE))], None);
1796        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1797        assert_eq!(iter.len(), 100_000_000_000);
1798    }
1799
1800    #[test]
1801    fn test_lazy_random_access_range_expr() {
1802        let space = make_space(vec![("X", range_expr_param(HUGE_RANGE))], None);
1803        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1804        let first = iter.get(0).unwrap();
1805        assert_eq!(first["X"].value, ExprValue::Int(1));
1806        let last = iter.get(99_999_999_999).unwrap();
1807        assert_eq!(last["X"].value, ExprValue::Int(100_000_000_000));
1808    }
1809
1810    #[test]
1811    fn test_lazy_product_with_huge_range() {
1812        let space = make_space(
1813            vec![
1814                ("A", int_param(vec![1, 2])),
1815                ("X", range_expr_param(HUGE_RANGE)),
1816            ],
1817            None,
1818        );
1819        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1820        assert_eq!(iter.len(), 200_000_000_000);
1821        // Random access into the middle
1822        let mid = iter.get(50_000_000_000).unwrap();
1823        assert!(mid.contains_key("A"));
1824        assert!(mid.contains_key("X"));
1825    }
1826
1827    #[test]
1828    fn test_lazy_iterate_first_few_of_huge_range() {
1829        let space = make_space(vec![("X", range_expr_param(HUGE_RANGE))], None);
1830        let mut iter = StepParameterSpaceIterator::new(&space).unwrap();
1831        let first = iter.next().unwrap();
1832        assert_eq!(first["X"].value, ExprValue::Int(1));
1833        let second = iter.next().unwrap();
1834        assert_eq!(second["X"].value, ExprValue::Int(2));
1835    }
1836
1837    #[test]
1838    fn test_lazy_product_iterate_first_few() {
1839        let space = make_space(
1840            vec![
1841                ("A", int_param(vec![10, 20])),
1842                ("X", range_expr_param(HUGE_RANGE)),
1843            ],
1844            None,
1845        );
1846        let mut iter = StepParameterSpaceIterator::new(&space).unwrap();
1847        // First item: A=10, X=1 (or A=20, X=1 depending on HashMap order)
1848        let first = iter.next().unwrap();
1849        assert!(first.contains_key("A"));
1850        assert!(first.contains_key("X"));
1851        // Just verify we can get a few without hanging
1852        for _ in 0..10 {
1853            assert!(iter.next().is_some());
1854        }
1855    }
1856
1857    #[test]
1858    fn test_lazy_static_chunk_with_huge_range() {
1859        // 100B items / 1000 per chunk = 100M chunks — construction must be lazy
1860        let space = make_space(vec![("C", static_chunk_param(HUGE_RANGE, 1000))], None);
1861        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1862        assert_eq!(iter.len(), 100_000_000);
1863        // Iterate first few chunks
1864        let first: Vec<_> = iter.take(3).collect();
1865        assert_eq!(first.len(), 3);
1866        assert!(first[0].contains_key("C"));
1867    }
1868
1869    #[test]
1870    fn test_lazy_iter_of_product_with_huge_range() {
1871        // Tests that ProductNode::iter() doesn't materialize the huge child
1872        let space = make_space(
1873            vec![
1874                ("A", int_param(vec![1, 2])),
1875                ("X", range_expr_param(HUGE_RANGE)),
1876                ("Chunk", adaptive_chunk_param(vec![10, 20, 30, 40], 2)),
1877            ],
1878            None,
1879        );
1880        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1881        assert!(iter.chunks_adaptive());
1882        // Iterate a few — must not OOM from materializing X's 100B values
1883        let mut count = 0;
1884        for params in iter {
1885            assert!(params.contains_key("A"));
1886            assert!(params.contains_key("X"));
1887            assert!(params.contains_key("Chunk"));
1888            count += 1;
1889            if count >= 5 {
1890                break;
1891            }
1892        }
1893        assert_eq!(count, 5);
1894    }
1895
1896    // ── Adaptive chunking tests ──
1897
1898    #[test]
1899    fn test_len_returns_zero_for_adaptive_chunking() {
1900        let space = make_space(
1901            vec![("Chunk", adaptive_chunk_param(vec![1, 2, 3, 4, 5, 6], 2))],
1902            None,
1903        );
1904        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1905        assert!(iter.chunks_adaptive());
1906        assert_eq!(iter.len(), 0);
1907    }
1908
1909    #[test]
1910    fn test_get_returns_none_for_adaptive_chunking() {
1911        let space = make_space(
1912            vec![("Chunk", adaptive_chunk_param(vec![1, 2, 3, 4, 5, 6], 2))],
1913            None,
1914        );
1915        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1916        assert!(iter.chunks_adaptive());
1917        assert!(iter.get(0).is_none());
1918    }
1919
1920    #[test]
1921    fn test_adaptive_chunking_with_multiple_params_iterates() {
1922        let space = make_space(
1923            vec![
1924                ("Frame", int_param(vec![1, 2])),
1925                ("Chunk", adaptive_chunk_param(vec![10, 20, 30, 40], 2)),
1926            ],
1927            None,
1928        );
1929        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1930        assert!(iter.chunks_adaptive());
1931        let mut count = 0;
1932        for params in iter {
1933            assert!(params.contains_key("Frame"));
1934            assert!(params.contains_key("Chunk"));
1935            count += 1;
1936            if count > 100 {
1937                break;
1938            }
1939        }
1940        assert_eq!(count, 4);
1941    }
1942
1943    #[test]
1944    fn test_adaptive_chunking_single_param_iterates() {
1945        let space = make_space(
1946            vec![("Chunk", adaptive_chunk_param(vec![1, 2, 3, 4, 5, 6], 3))],
1947            None,
1948        );
1949        let results: Vec<_> = StepParameterSpaceIterator::new(&space).unwrap().collect();
1950        assert_eq!(results.len(), 2);
1951    }
1952
1953    #[test]
1954    fn test_adaptive_with_association_iterates() {
1955        let space = make_space(
1956            vec![
1957                ("Frame", int_param(vec![1, 2])),
1958                ("Chunk", adaptive_chunk_param(vec![10, 20], 1)),
1959            ],
1960            Some("(Frame, Chunk)"),
1961        );
1962        let results: Vec<_> = StepParameterSpaceIterator::new(&space).unwrap().collect();
1963        assert_eq!(results.len(), 2);
1964    }
1965
1966    // ── validate_containment tests ──
1967
1968    fn tpv(param_type: TaskParameterType, value: ExprValue) -> TaskParameterValue {
1969        TaskParameterValue { param_type, value }
1970    }
1971
1972    #[test]
1973    fn test_validate_containment_name_mismatch() {
1974        let space = make_space(vec![("Frame", int_param(vec![1, 2, 3]))], None);
1975        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1976        let mut params = TaskParameterSet::new();
1977        params.insert(
1978            "Wrong".into(),
1979            tpv(TaskParameterType::Int, ExprValue::Int(1)),
1980        );
1981        let err = iter.validate_containment(&params).unwrap_err();
1982        assert!(err.contains("do not match"), "got: {err}");
1983        assert!(err.contains("Wrong"), "got: {err}");
1984        assert!(err.contains("Frame"), "got: {err}");
1985    }
1986
1987    #[test]
1988    fn test_validate_containment_value_not_in_range() {
1989        let space = make_space(vec![("Frame", int_param(vec![1, 2, 3]))], None);
1990        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1991        let mut params = TaskParameterSet::new();
1992        params.insert(
1993            "Frame".into(),
1994            tpv(TaskParameterType::Int, ExprValue::Int(99)),
1995        );
1996        let err = iter.validate_containment(&params).unwrap_err();
1997        assert!(err.contains("Frame"), "got: {err}");
1998        assert!(err.contains("99"), "got: {err}");
1999        assert!(
2000            err.contains("not in the parameter space range"),
2001            "got: {err}"
2002        );
2003    }
2004
2005    #[test]
2006    fn test_validate_containment_range_expr_value_not_in_range() {
2007        let space = make_space(vec![("X", range_expr_param("1-10"))], None);
2008        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2009        let mut params = TaskParameterSet::new();
2010        params.insert("X".into(), tpv(TaskParameterType::Int, ExprValue::Int(99)));
2011        let err = iter.validate_containment(&params).unwrap_err();
2012        assert!(err.contains("X"), "got: {err}");
2013        assert!(err.contains("99"), "got: {err}");
2014        assert!(
2015            err.contains("not in the parameter space range"),
2016            "got: {err}"
2017        );
2018    }
2019
2020    #[test]
2021    fn test_validate_containment_success() {
2022        let space = make_space(vec![("Frame", int_param(vec![1, 2, 3]))], None);
2023        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2024        let mut params = TaskParameterSet::new();
2025        params.insert(
2026            "Frame".into(),
2027            tpv(TaskParameterType::Int, ExprValue::Int(2)),
2028        );
2029        assert!(iter.validate_containment(&params).is_ok());
2030    }
2031
2032    #[test]
2033    fn test_validate_containment_association_not_found() {
2034        let space = make_space(
2035            vec![("A", int_param(vec![1, 2])), ("B", int_param(vec![10, 20]))],
2036            Some("(A, B)"),
2037        );
2038        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2039        // A=1,B=20 is not a valid association pair (valid: A=1,B=10 and A=2,B=20)
2040        let mut params = TaskParameterSet::new();
2041        params.insert("A".into(), tpv(TaskParameterType::Int, ExprValue::Int(1)));
2042        params.insert("B".into(), tpv(TaskParameterType::Int, ExprValue::Int(20)));
2043        let err = iter.validate_containment(&params).unwrap_err();
2044        assert!(err.contains("association"), "got: {err}");
2045    }
2046
2047    #[test]
2048    fn test_validate_containment_chunk_not_subset() {
2049        let space = make_space(vec![("C", static_chunk_param("1-10", 5))], None);
2050        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2051        // Chunk "1-99" is not a subset of range 1-10
2052        let mut params = TaskParameterSet::new();
2053        params.insert(
2054            "C".into(),
2055            tpv(
2056                TaskParameterType::ChunkInt,
2057                ExprValue::RangeExpr("1-99".parse::<RangeExpr>().unwrap()),
2058            ),
2059        );
2060        let err = iter.validate_containment(&params).unwrap_err();
2061        assert!(err.contains("C"), "got: {err}");
2062        assert!(err.contains("not"), "got: {err}");
2063    }
2064
2065    // ── F2: get_value with range_expr returns values without panic ──
2066
2067    #[test]
2068    fn test_contiguous_chunk_stepped_range_iterates_without_panic() {
2069        // Stepped range (step=2) exercises the sr.get(idx) path in
2070        // count_contiguous_chunks_for_range and ContiguousChunkIterState::get_value
2071        let space = make_space(vec![("C", static_chunk_param("1-10:2", 2))], None);
2072        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2073        let results: Vec<_> = iter.collect();
2074        assert!(!results.is_empty(), "should produce at least one chunk");
2075        for r in &results {
2076            assert!(r.contains_key("C"));
2077        }
2078    }
2079
2080    #[test]
2081    fn test_range_expr_random_access_does_not_panic() {
2082        // Exercises RangeExprNode::get which calls r.get(i as i64)
2083        let space = make_space(vec![("X", range_expr_param("1-5"))], None);
2084        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2085        for i in 0..5 {
2086            let set = iter.get(i).unwrap();
2087            assert_eq!(set["X"].value, ExprValue::Int(i as i64 + 1));
2088        }
2089    }
2090}