Skip to main content

openjd_model/job/
step_param_space.rs

1// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2// Copyright by contributors to this project.
3// SPDX-License-Identifier: (Apache-2.0 OR MIT)
4
5//! Step parameter space iteration.
6//!
7//! Provides `StepParameterSpaceIterator` for lazily iterating over the
8//! multidimensional space of task parameter values. Operates on resolved
9//! `job::StepParameterSpace` types (no SymbolTable needed).
10//!
11//! Uses a tree of `Node` objects for lazy evaluation:
12//! - `RangeExprNode`: computes values on demand via index arithmetic
13//! - `ProductNode`: divmod indexing (rightmost moves fastest)
14//! - `AssociationNode`: lockstep indexing
15//! - `StaticChunkNode`: pre-computed chunk boundaries
16
17use std::collections::HashSet;
18use std::sync::atomic::{AtomicUsize, Ordering};
19use std::sync::Arc;
20
21use openjd_expr::value::Float64;
22use openjd_expr::{ExprValue, RangeExpr};
23
24use crate::error::ModelError;
25use crate::job;
26use crate::template::RangeConstraint;
27use crate::types::{TaskParameterSet, TaskParameterType, TaskParameterValue};
28
29// ── Shared utilities ──
30
31/// Compute the product of child node lengths with overflow checking.
32fn checked_product_len(children: &[Box<dyn Node>]) -> Result<usize, ModelError> {
33    children.iter().try_fold(1usize, |acc, c| {
34        acc.checked_mul(c.len()).ok_or_else(|| {
35            ModelError::DecodeValidation(
36                "Total parameter space size overflow: the product of parameter dimensions is too large.".into(),
37            )
38        })
39    })
40}
41
42/// Tokenize a combination expression into identifiers and operators.
43fn tokenize(expr: &str) -> Vec<String> {
44    let mut tokens = Vec::new();
45    let mut current = String::new();
46    for ch in expr.chars() {
47        match ch {
48            '*' | '(' | ')' | ',' => {
49                if !current.is_empty() {
50                    tokens.push(std::mem::take(&mut current));
51                }
52                tokens.push(ch.to_string());
53            }
54            c if c.is_whitespace() => {
55                if !current.is_empty() {
56                    tokens.push(std::mem::take(&mut current));
57                }
58            }
59            _ => current.push(ch),
60        }
61    }
62    if !current.is_empty() {
63        tokens.push(current);
64    }
65    tokens
66}
67
68/// Compress a slice of integers into a compact range expression string.
69/// e.g., [1,2,3,5,7,8,9] → "1-3,5,7-9"
70fn compress_range_expr(values: &[i64]) -> String {
71    if values.is_empty() {
72        return String::new();
73    }
74    if values.len() == 1 {
75        return values[0].to_string();
76    }
77
78    // Detect runs with a constant step. A run needs 3+ values to use step notation;
79    // with only 2 values, any step is trivially valid so we don't commit to it.
80    let mut parts = Vec::new();
81    let mut i = 0;
82    while i < values.len() {
83        if i + 2 < values.len() {
84            let step = values[i + 1] - values[i];
85            if step > 0 && values[i + 2] - values[i + 1] == step {
86                // Found a run of at least 3 with constant step
87                let mut end = i + 2;
88                while end + 1 < values.len() && values[end + 1] - values[end] == step {
89                    end += 1;
90                }
91                if step == 1 {
92                    parts.push(format!("{}-{}", values[i], values[end]));
93                } else {
94                    parts.push(format!("{}-{}:{}", values[i], values[end], step));
95                }
96                i = end + 1;
97                continue;
98            }
99        }
100        parts.push(values[i].to_string());
101        i += 1;
102    }
103    parts.join(",")
104}
105
106/// Build a `RangeExpr` for chunk `i` given the chunk layout parameters.
107/// Used by `StaticChunkNode` and `StaticChunkIterator` for noncontiguous chunking.
108///
109/// - `range`: the full integer range being chunked
110/// - `constraint`: whether chunks must be contiguous (`1-10`) or can be non-contiguous (`1,3,7-10`)
111/// - `small`: base chunk size (`total_values / num_chunks`)
112/// - `leftovers`: how many of the first chunks get one extra element (`total_values % num_chunks`)
113/// - `i`: zero-based chunk index to build
114fn build_chunk_range_expr(
115    range: &job::TaskParamRange<i64>,
116    constraint: &RangeConstraint,
117    small: usize,
118    leftovers: usize,
119    i: usize,
120) -> RangeExpr {
121    let size = small + if i < leftovers { 1 } else { 0 };
122    let offset = i * small + i.min(leftovers);
123    let build = |vals: &[i64]| -> RangeExpr {
124        let range_str = match constraint {
125            RangeConstraint::Contiguous => {
126                if vals.len() == 1 {
127                    vals[0].to_string()
128                } else {
129                    format!("{}-{}", vals[0], vals[vals.len() - 1])
130                }
131            }
132            RangeConstraint::Noncontiguous => compress_range_expr(vals),
133        };
134        let expr = range_str
135            .parse::<RangeExpr>()
136            .expect("range string built from valid integers");
137        match constraint {
138            RangeConstraint::Contiguous => expr.with_contiguous(true),
139            RangeConstraint::Noncontiguous => expr,
140        }
141    };
142    match range {
143        job::TaskParamRange::RangeExpr(r) => {
144            let vals: Vec<i64> = (offset..offset + size)
145                .map(|j| r.get(j as i64).expect("chunk element within range bounds"))
146                .collect();
147            build(&vals)
148        }
149        job::TaskParamRange::List(values) => build(&values[offset..offset + size]),
150    }
151}
152
153// ── Node trait and implementations ──
154
155/// Internal trait for lazy parameter space tree nodes.
156trait Node: Send + Sync {
157    fn len(&self) -> usize;
158    fn get(&self, index: usize, result: &mut TaskParameterSet);
159    /// Validate containment with a detailed error message on failure.
160    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String>;
161    /// Create an iterator over this node's elements.
162    fn iter(&self) -> Box<dyn NodeIterator>;
163}
164
165/// Iterator trait for node-level iteration (supports adaptive chunking).
166trait NodeIterator: Send + Sync {
167    fn next(&mut self, result: &mut TaskParameterSet) -> bool;
168    fn reset(&mut self);
169}
170
171/// Simple index-based iterator for non-adaptive nodes.
172/// Tracks only index and length — the caller (ProductIterator/AssociationIterator)
173/// is responsible for calling `get()` on the original node to populate results.
174struct IndexedNodeIterator {
175    len: usize,
176    index: usize,
177}
178
179impl NodeIterator for IndexedNodeIterator {
180    fn next(&mut self, _result: &mut TaskParameterSet) -> bool {
181        if self.index >= self.len {
182            return false;
183        }
184        self.index += 1;
185        true
186    }
187    fn reset(&mut self) {
188        self.index = 0;
189    }
190}
191
192/// Value-producing iterator for a single parameter with a list of values.
193struct RangeListIterator {
194    name: String,
195    param_type: TaskParameterType,
196    values: Vec<ExprValue>,
197    index: usize,
198}
199
200impl NodeIterator for RangeListIterator {
201    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
202        if self.index >= self.values.len() {
203            return false;
204        }
205        result.insert(
206            self.name.clone(),
207            TaskParameterValue {
208                param_type: self.param_type,
209                value: self.values[self.index].clone(),
210            },
211        );
212        self.index += 1;
213        true
214    }
215    fn reset(&mut self) {
216        self.index = 0;
217    }
218}
219
220/// Value-producing iterator for a single parameter with a RangeExpr.
221struct RangeExprIterator {
222    name: String,
223    range: RangeExpr,
224    index: usize,
225}
226
227impl NodeIterator for RangeExprIterator {
228    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
229        if self.index >= self.range.len() {
230            return false;
231        }
232        result.insert(
233            self.name.clone(),
234            TaskParameterValue {
235                param_type: TaskParameterType::Int,
236                value: ExprValue::Int(
237                    self.range
238                        .get(self.index as i64)
239                        .expect("index checked against range.len()"),
240                ),
241            },
242        );
243        self.index += 1;
244        true
245    }
246    fn reset(&mut self) {
247        self.index = 0;
248    }
249}
250
251/// Value-producing iterator for static chunk nodes.
252struct StaticChunkIterator {
253    name: String,
254    range: job::TaskParamRange<i64>,
255    constraint: RangeConstraint,
256    num_chunks: usize,
257    small: usize,
258    leftovers: usize,
259    index: usize,
260}
261
262impl StaticChunkIterator {
263    fn chunk_range_expr(&self, i: usize) -> RangeExpr {
264        build_chunk_range_expr(&self.range, &self.constraint, self.small, self.leftovers, i)
265    }
266}
267
268impl NodeIterator for StaticChunkIterator {
269    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
270        if self.index >= self.num_chunks {
271            return false;
272        }
273        result.insert(
274            self.name.clone(),
275            TaskParameterValue {
276                param_type: TaskParameterType::ChunkInt,
277                value: ExprValue::RangeExpr(self.chunk_range_expr(self.index)),
278            },
279        );
280        self.index += 1;
281        true
282    }
283    fn reset(&mut self) {
284        self.index = 0;
285    }
286}
287
288/// Contiguous chunking node: splits values into chunks that respect gaps.
289/// Contiguous runs in the source range are identified, then each run is
290/// chunked independently. Uses index-based access to avoid materializing values.
291struct ContiguousChunkNode {
292    name: String,
293    range: job::TaskParamRange<i64>,
294    default_task_count: usize,
295    num_chunks: usize, // cached exact count
296    total_len: usize,
297}
298
299/// Count contiguous chunks by walking the range's sub-ranges.
300/// For `RangeExpr`, uses the internal `IntRange` structure for O(R) where R is the
301/// number of sub-ranges (not the number of values). For `List`, scans values in O(N).
302fn count_contiguous_chunks_for_range(
303    range: &job::TaskParamRange<i64>,
304    default_task_count: usize,
305) -> usize {
306    match range {
307        job::TaskParamRange::List(v) => {
308            if v.is_empty() {
309                return 0;
310            }
311            let mut total = 0usize;
312            let mut interval_start = 0usize;
313            for i in 0..v.len() - 1 {
314                if v[i + 1] != v[i] + 1 {
315                    let len = i - interval_start + 1;
316                    total += len.div_ceil(default_task_count);
317                    interval_start = i + 1;
318                }
319            }
320            total += (v.len() - interval_start).div_ceil(default_task_count);
321            total
322        }
323        job::TaskParamRange::RangeExpr(r) => {
324            count_contiguous_chunks_from_sub_ranges(r, default_task_count)
325        }
326    }
327}
328
329/// Count contiguous chunks by iterating the `IntRange` sub-ranges of a `RangeExpr`.
330/// Merges adjacent intervals and computes chunk counts arithmetically per interval.
331fn count_contiguous_chunks_from_sub_ranges(r: &RangeExpr, default_task_count: usize) -> usize {
332    let sub_ranges = r.ranges();
333    if sub_ranges.is_empty() {
334        return 0;
335    }
336
337    let mut total_chunks = 0usize;
338    // Track the current merged contiguous interval as (start_val, end_val)
339    let mut interval: Option<(i64, i64)> = None;
340
341    for sr in sub_ranges {
342        if sr.step == 1 {
343            // This sub-range is contiguous: values from sr.start to sr.end
344            match interval {
345                Some((is, ie)) if sr.start == ie + 1 => {
346                    // Extends the current interval
347                    interval = Some((is, sr.end));
348                }
349                Some((is, ie)) => {
350                    // Gap — flush the current interval
351                    let len = (ie - is + 1) as usize;
352                    total_chunks += len.div_ceil(default_task_count);
353                    interval = Some((sr.start, sr.end));
354                }
355                None => {
356                    interval = Some((sr.start, sr.end));
357                }
358            }
359        } else {
360            // Step > 1: each value is isolated (has gaps between them).
361            // We need to check if the first value merges with the current interval,
362            // then each subsequent value is its own interval.
363            let count = sr.len();
364            for idx in 0..count {
365                // SAFETY: idx is bounded by sr.len(), so get() always returns Some.
366                let val = sr.get(idx).expect("index within sub-range bounds");
367                match interval {
368                    Some((is, ie)) if val == ie + 1 => {
369                        interval = Some((is, val));
370                    }
371                    Some((is, ie)) => {
372                        let len = (ie - is + 1) as usize;
373                        total_chunks += len.div_ceil(default_task_count);
374                        interval = Some((val, val));
375                    }
376                    None => {
377                        interval = Some((val, val));
378                    }
379                }
380            }
381        }
382    }
383    // Flush final interval
384    if let Some((is, ie)) = interval {
385        let len = (ie - is + 1) as usize;
386        total_chunks += len.div_ceil(default_task_count);
387    }
388    total_chunks
389}
390
391impl ContiguousChunkNode {
392    fn new(name: String, range: job::TaskParamRange<i64>, default_task_count: usize) -> Self {
393        let total_len = match &range {
394            job::TaskParamRange::List(v) => v.len(),
395            job::TaskParamRange::RangeExpr(r) => r.len(),
396        };
397        let dtc = default_task_count.max(1);
398        let num_chunks = count_contiguous_chunks_for_range(&range, dtc);
399        Self {
400            name,
401            range,
402            default_task_count: dtc,
403            num_chunks,
404            total_len,
405        }
406    }
407}
408
409impl Node for ContiguousChunkNode {
410    fn len(&self) -> usize {
411        self.num_chunks
412    }
413    fn get(&self, _index: usize, _result: &mut TaskParameterSet) {
414        // Sequential-only; use iter()
415    }
416    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
417        let v = params.get(&self.name).ok_or_else(|| {
418            format!(
419                "Parameter '{}' not found in the provided parameters.",
420                self.name
421            )
422        })?;
423        match &v.value {
424            ExprValue::RangeExpr(r) => {
425                // Check by iterating chunks
426                for chunk in ContiguousChunkIterState::new(self) {
427                    if chunk == *r {
428                        return Ok(());
429                    }
430                }
431                Err(format!(
432                    "Parameter '{}' value '{}' is not a valid chunk in the parameter space.",
433                    self.name, r
434                ))
435            }
436            _ => Err(format!(
437                "Parameter '{}' value '{}' is not in the parameter space range.",
438                self.name,
439                v.value.to_display_string()
440            )),
441        }
442    }
443    fn iter(&self) -> Box<dyn NodeIterator> {
444        Box::new(ContiguousChunkNodeIterator {
445            state: ContiguousChunkIterState::new(self),
446            name: self.name.clone(),
447        })
448    }
449}
450
451/// Reusable state for iterating contiguous chunks from a range.
452/// Finds contiguous intervals, then divides each interval evenly into chunks
453/// matching the Python `divide_int_interval_into_chunks` algorithm.
454struct ContiguousChunkIterState {
455    range: job::TaskParamRange<i64>,
456    default_task_count: usize,
457    total_len: usize,
458    cursor: usize,
459    // Current interval chunking state
460    interval_start_val: i64, // first value of current interval
461    interval_chunks_remaining: usize,
462    interval_pos: i64, // next value to emit within interval
463    interval_small: usize,
464    interval_leftovers: usize,
465    interval_chunk_index: usize,
466    interval_chunk_count: usize,
467}
468
469impl ContiguousChunkIterState {
470    fn new(node: &ContiguousChunkNode) -> Self {
471        Self {
472            range: node.range.clone(),
473            default_task_count: node.default_task_count,
474            total_len: node.total_len,
475            cursor: 0,
476            interval_start_val: 0,
477            interval_chunks_remaining: 0,
478            interval_pos: 0,
479            interval_small: 0,
480            interval_leftovers: 0,
481            interval_chunk_index: 0,
482            interval_chunk_count: 0,
483        }
484    }
485
486    fn get_value(&self, i: usize) -> i64 {
487        match &self.range {
488            job::TaskParamRange::List(v) => v[i],
489            // i is always bounded by the range length via cursor/total_len checks in callers.
490            job::TaskParamRange::RangeExpr(r) => {
491                r.get(i as i64).expect("index within range bounds")
492            }
493        }
494    }
495
496    /// Find the last index of the contiguous interval starting at `start`.
497    /// For `RangeExpr`, uses sub-range structure to skip step-1 ranges in O(R).
498    /// For `List`, scans values in O(interval_len).
499    fn find_interval_end(&self, start: usize) -> usize {
500        match &self.range {
501            job::TaskParamRange::List(v) => {
502                let mut end = start;
503                while end + 1 < v.len() && v[end + 1] == v[end] + 1 {
504                    end += 1;
505                }
506                end
507            }
508            job::TaskParamRange::RangeExpr(r) => {
509                // Use sub-ranges: find which sub-range contains `start`, then
510                // walk forward through step-1 sub-ranges that are adjacent.
511                let cumulative = r.cumulative_lengths();
512                let sub_ranges = r.ranges();
513
514                // Binary search for the sub-range containing `start`
515                let sr_idx = cumulative.partition_point(|&c| c <= start);
516                let sr_offset = if sr_idx == 0 {
517                    0
518                } else {
519                    cumulative[sr_idx - 1]
520                };
521
522                let sr = &sub_ranges[sr_idx];
523
524                if sr.step != 1 {
525                    // Step > 1: each value is isolated
526                    return start;
527                }
528
529                // Current sub-range is step-1: interval extends to end of this sub-range
530                let mut end = sr_offset + sr.len() - 1;
531
532                // Check subsequent sub-ranges for adjacency
533                let mut last_val = sr.end;
534                for next_sr in &sub_ranges[sr_idx + 1..] {
535                    if next_sr.start == last_val + 1 && next_sr.step == 1 {
536                        end += next_sr.len();
537                        last_val = next_sr.end;
538                    } else if next_sr.start == last_val + 1 && next_sr.step > 1 {
539                        // First value is adjacent, but subsequent values have gaps
540                        end += 1;
541                        break;
542                    } else {
543                        break;
544                    }
545                }
546                end
547            }
548        }
549    }
550
551    /// Advance cursor to find the next contiguous interval and set up chunking state.
552    fn start_next_interval(&mut self) -> bool {
553        if self.cursor >= self.total_len {
554            return false;
555        }
556        let first = self.get_value(self.cursor);
557
558        // Find end of contiguous interval efficiently
559        let end_idx = self.find_interval_end(self.cursor);
560        let last = self.get_value(end_idx);
561        let interval_len = (last - first + 1) as usize;
562        self.cursor = end_idx + 1;
563
564        // Compute even chunk distribution for this interval
565        let chunk_count = interval_len.div_ceil(self.default_task_count);
566        let (small, leftovers) = if chunk_count >= interval_len {
567            (1, 0)
568        } else if chunk_count <= 1 {
569            (interval_len, 0)
570        } else {
571            (interval_len / chunk_count, interval_len % chunk_count)
572        };
573
574        self.interval_start_val = first;
575        self.interval_pos = first;
576        self.interval_chunks_remaining = chunk_count;
577        self.interval_small = small;
578        self.interval_leftovers = leftovers;
579        self.interval_chunk_index = 0;
580        self.interval_chunk_count = chunk_count;
581        true
582    }
583
584    fn next_chunk(&mut self) -> Option<RangeExpr> {
585        // If no chunks remaining in current interval, find next interval
586        while self.interval_chunks_remaining == 0 {
587            if !self.start_next_interval() {
588                return None;
589            }
590        }
591
592        // Compute chunk size using Python's even distribution:
593        // chunk_sizes[(i * chunk_count) // leftovers] += 1
594        let mut size = self.interval_small;
595        if self.interval_leftovers > 0
596            && (self.interval_chunk_index * self.interval_chunk_count) / self.interval_leftovers
597                != ((self.interval_chunk_index + 1) * self.interval_chunk_count)
598                    / self.interval_leftovers
599        {
600            // This is a simpler equivalent: check if this index gets a +1
601            // by testing if floor((i+1)*count/left) > floor(i*count/left)
602        }
603        // Actually, replicate the Python algorithm directly:
604        // chunk_sizes = [small] * chunk_count
605        // for i in range(leftovers): chunk_sizes[(i * chunk_count) // leftovers] += 1
606        // Check if current chunk_index is one of the +1 slots
607        if self.interval_leftovers > 0 {
608            let idx = self.interval_chunk_index;
609            let cc = self.interval_chunk_count;
610            let lo = self.interval_leftovers;
611            // The +1 slots are at indices: (i * cc) // lo for i in 0..lo
612            // Equivalently, idx gets +1 if there exists i such that (i * cc) / lo == idx
613            // which means: idx * lo <= i * cc < (idx + 1) * lo
614            // i.e., ceil(idx * lo / cc) <= i < ceil((idx+1) * lo / cc)
615            // If that range is non-empty, this index gets +1
616            let i_start = (idx * lo).div_ceil(cc);
617            let i_end = ((idx + 1) * lo).div_ceil(cc);
618            if i_start < i_end && i_start < lo {
619                size += 1;
620            }
621        }
622
623        let start = self.interval_pos;
624        let end = start + size as i64 - 1;
625        self.interval_pos = end + 1;
626        self.interval_chunks_remaining -= 1;
627        self.interval_chunk_index += 1;
628
629        let s = format!("{start}-{end}");
630        Some(
631            s.parse::<RangeExpr>()
632                .expect("valid range")
633                .with_contiguous(true),
634        )
635    }
636}
637
638impl Iterator for ContiguousChunkIterState {
639    type Item = RangeExpr;
640    fn next(&mut self) -> Option<RangeExpr> {
641        self.next_chunk()
642    }
643}
644
645/// NodeIterator wrapper for ContiguousChunkNode.
646struct ContiguousChunkNodeIterator {
647    state: ContiguousChunkIterState,
648    name: String,
649}
650
651impl NodeIterator for ContiguousChunkNodeIterator {
652    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
653        match self.state.next_chunk() {
654            Some(expr) => {
655                result.insert(
656                    self.name.clone(),
657                    TaskParameterValue {
658                        param_type: TaskParameterType::ChunkInt,
659                        value: ExprValue::RangeExpr(expr),
660                    },
661                );
662                true
663            }
664            None => false,
665        }
666    }
667    fn reset(&mut self) {
668        self.state.cursor = 0;
669        self.state.interval_chunks_remaining = 0;
670    }
671}
672
673/// Zero-dimensional space: produces one empty parameter set.
674struct ZeroDimSpaceNode;
675
676impl Node for ZeroDimSpaceNode {
677    fn len(&self) -> usize {
678        1
679    }
680    fn get(&self, _index: usize, _result: &mut TaskParameterSet) {}
681    fn validate_containment(&self, _params: &TaskParameterSet) -> Result<(), String> {
682        Ok(())
683    }
684    fn iter(&self) -> Box<dyn NodeIterator> {
685        Box::new(IndexedNodeIterator { len: 1, index: 0 })
686    }
687}
688
689/// Wraps a parameter name + pre-materialized list of values.
690struct RangeListNode {
691    name: String,
692    param_type: TaskParameterType,
693    values: Vec<ExprValue>,
694}
695
696impl Node for RangeListNode {
697    fn len(&self) -> usize {
698        self.values.len()
699    }
700    fn get(&self, index: usize, result: &mut TaskParameterSet) {
701        result.insert(
702            self.name.clone(),
703            TaskParameterValue {
704                param_type: self.param_type,
705                value: self.values[index].clone(),
706            },
707        );
708    }
709    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
710        let v = params.get(&self.name).ok_or_else(|| {
711            format!(
712                "Parameter '{}' not found in the provided parameters.",
713                self.name
714            )
715        })?;
716        if self.param_type == TaskParameterType::ChunkInt {
717            // Chunk: value must be a RangeExpr whose elements are all in our range
718            match &v.value {
719                ExprValue::RangeExpr(r) => {
720                    for val in r.iter() {
721                        if !self
722                            .values
723                            .iter()
724                            .any(|ev| matches!(ev, ExprValue::Int(i) if *i == val))
725                        {
726                            return Err(format!(
727                                "Parameter '{}' value '{}' is not a subset of the range in the parameter space.",
728                                self.name, r
729                            ));
730                        }
731                    }
732                    Ok(())
733                }
734                _ => Err(format!(
735                    "Parameter '{}' value '{}' is not in the parameter space range.",
736                    self.name,
737                    v.value.to_display_string()
738                )),
739            }
740        } else if !self.values.iter().any(|ev| expr_value_eq(ev, &v.value)) {
741            Err(format!(
742                "Parameter '{}' value '{}' is not in the parameter space range.",
743                self.name,
744                v.value.to_display_string()
745            ))
746        } else {
747            Ok(())
748        }
749    }
750    fn iter(&self) -> Box<dyn NodeIterator> {
751        Box::new(RangeListIterator {
752            name: self.name.clone(),
753            param_type: self.param_type,
754            values: self.values.clone(),
755            index: 0,
756        })
757    }
758}
759
760/// Wraps a parameter name + `RangeExpr`; computes values on demand.
761struct RangeExprNode {
762    name: String,
763    range: RangeExpr,
764}
765
766impl Node for RangeExprNode {
767    fn len(&self) -> usize {
768        self.range.len()
769    }
770    fn get(&self, index: usize, result: &mut TaskParameterSet) {
771        let val = self
772            .range
773            .get(index as i64)
774            .expect("caller must pass index < self.range.len()");
775        result.insert(
776            self.name.clone(),
777            TaskParameterValue {
778                param_type: TaskParameterType::Int,
779                value: ExprValue::Int(val),
780            },
781        );
782    }
783    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
784        let v = params.get(&self.name).ok_or_else(|| {
785            format!(
786                "Parameter '{}' not found in the provided parameters.",
787                self.name
788            )
789        })?;
790        match &v.value {
791            ExprValue::Int(i) => {
792                if self.range.contains(*i) {
793                    Ok(())
794                } else {
795                    Err(format!(
796                        "Parameter '{}' value '{}' is not in the parameter space range.",
797                        self.name, i
798                    ))
799                }
800            }
801            _ => Err(format!(
802                "Parameter '{}' value '{}' is not in the parameter space range.",
803                self.name,
804                v.value.to_display_string()
805            )),
806        }
807    }
808    fn iter(&self) -> Box<dyn NodeIterator> {
809        Box::new(RangeExprIterator {
810            name: self.name.clone(),
811            range: self.range.clone(),
812            index: 0,
813        })
814    }
815}
816
817/// Wraps a parameter name + pre-computed chunk `RangeExpr`s.
818struct StaticChunkNode {
819    name: String,
820    range: job::TaskParamRange<i64>,
821    constraint: RangeConstraint,
822    num_chunks: usize,
823    small: usize,     // base chunk size = total / num_chunks
824    leftovers: usize, // first `leftovers` chunks get size small+1
825}
826
827impl StaticChunkNode {
828    /// Build a RangeExpr for chunk `i` on the fly.
829    fn chunk_range_expr(&self, i: usize) -> RangeExpr {
830        build_chunk_range_expr(&self.range, &self.constraint, self.small, self.leftovers, i)
831    }
832}
833
834impl Node for StaticChunkNode {
835    fn len(&self) -> usize {
836        self.num_chunks
837    }
838    fn get(&self, index: usize, result: &mut TaskParameterSet) {
839        result.insert(
840            self.name.clone(),
841            TaskParameterValue {
842                param_type: TaskParameterType::ChunkInt,
843                value: ExprValue::RangeExpr(self.chunk_range_expr(index)),
844            },
845        );
846    }
847    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
848        let v = params.get(&self.name).ok_or_else(|| {
849            format!(
850                "Parameter '{}' not found in the provided parameters.",
851                self.name
852            )
853        })?;
854        match &v.value {
855            ExprValue::RangeExpr(r) => {
856                if (0..self.num_chunks).any(|i| self.chunk_range_expr(i) == *r) {
857                    Ok(())
858                } else {
859                    Err(format!(
860                        "Parameter '{}' value '{}' is not a valid chunk in the parameter space.",
861                        self.name, r
862                    ))
863                }
864            }
865            _ => Err(format!(
866                "Parameter '{}' value '{}' is not in the parameter space range.",
867                self.name,
868                v.value.to_display_string()
869            )),
870        }
871    }
872    fn iter(&self) -> Box<dyn NodeIterator> {
873        Box::new(StaticChunkIterator {
874            name: self.name.clone(),
875            range: self.range.clone(),
876            constraint: self.constraint.clone(),
877            num_chunks: self.num_chunks,
878            small: self.small,
879            leftovers: self.leftovers,
880            index: 0,
881        })
882    }
883}
884
885/// Cartesian product of children (rightmost moves fastest).
886struct ProductNode {
887    children: Vec<Box<dyn Node>>,
888    length: usize,
889}
890
891impl Node for ProductNode {
892    fn len(&self) -> usize {
893        self.length
894    }
895    fn get(&self, mut index: usize, result: &mut TaskParameterSet) {
896        for child in self.children.iter().rev() {
897            let child_len = child.len();
898            child.get(index % child_len, result);
899            index /= child_len;
900        }
901    }
902    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
903        for child in &self.children {
904            child.validate_containment(params)?;
905        }
906        Ok(())
907    }
908    fn iter(&self) -> Box<dyn NodeIterator> {
909        Box::new(ProductIterator::new(&self.children))
910    }
911}
912
913/// Iterator for ProductNode that composes child iterators.
914/// Non-adaptive children cycle through their values (rightmost fastest);
915/// the adaptive child (if any) advances when non-adaptive children wrap.
916struct ProductIterator {
917    children: Vec<ChildIterator>,
918    started: bool,
919}
920
921struct ChildIterator {
922    iter: Box<dyn NodeIterator>,
923    current: TaskParameterSet,
924}
925
926impl ProductIterator {
927    fn new(children: &[Box<dyn Node>]) -> Self {
928        let children = children
929            .iter()
930            .map(|child| ChildIterator {
931                iter: child.iter(),
932                current: TaskParameterSet::new(),
933            })
934            .collect();
935        Self {
936            children,
937            started: false,
938        }
939    }
940
941    /// Advance the first value from each child. Returns false if any child is empty.
942    fn initialize(&mut self) -> bool {
943        for child in &mut self.children {
944            if !child.iter.next(&mut child.current) {
945                return false;
946            }
947        }
948        true
949    }
950}
951
952impl NodeIterator for ProductIterator {
953    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
954        if !self.started {
955            self.started = true;
956            if !self.initialize() {
957                return false;
958            }
959        } else {
960            // Advance rightmost, carry left
961            let mut carry = true;
962            for child in self.children.iter_mut().rev() {
963                if !carry {
964                    break;
965                }
966                child.current.clear();
967                if child.iter.next(&mut child.current) {
968                    carry = false;
969                } else {
970                    // Exhausted — reset and advance to first value, carry continues
971                    child.iter.reset();
972                    if !child.iter.next(&mut child.current) {
973                        return false;
974                    }
975                }
976            }
977            if carry {
978                return false;
979            }
980        }
981        for child in &self.children {
982            result.extend(child.current.iter().map(|(k, v)| (k.clone(), v.clone())));
983        }
984        true
985    }
986    fn reset(&mut self) {
987        self.started = false;
988        for child in &mut self.children {
989            child.iter.reset();
990            child.current.clear();
991        }
992    }
993}
994
995/// Association: all children have the same length, indexed in lockstep.
996struct AssociationNode {
997    children: Vec<Box<dyn Node>>,
998    length: usize,
999}
1000
1001impl Node for AssociationNode {
1002    fn len(&self) -> usize {
1003        self.length
1004    }
1005    fn get(&self, index: usize, result: &mut TaskParameterSet) {
1006        for child in &self.children {
1007            child.get(index, result);
1008        }
1009    }
1010    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
1011        // Project `params` onto just this association's keys, so that
1012        // when this association is nested inside a parent (e.g. as a
1013        // child of a Product) the comparison ignores keys belonging to
1014        // sibling branches of the parent expression. Without this
1015        // projection, `params_equal` rejects every candidate on the
1016        // very first length check, since `params` carries the full
1017        // parameter set while `candidate` carries only this
1018        // association's children's keys.
1019        let assoc_keys: std::collections::HashSet<String> = {
1020            let mut ks = std::collections::HashSet::new();
1021            for child in &self.children {
1022                let mut sample = TaskParameterSet::new();
1023                child.get(0, &mut sample);
1024                for k in sample.keys() {
1025                    ks.insert(k.clone());
1026                }
1027            }
1028            ks
1029        };
1030        let projected: TaskParameterSet = params
1031            .iter()
1032            .filter(|(k, _)| assoc_keys.contains(*k))
1033            .map(|(k, v)| (k.clone(), v.clone()))
1034            .collect();
1035
1036        // Linear scan: at least one index must match all children simultaneously
1037        for i in 0..self.length {
1038            let mut candidate = TaskParameterSet::new();
1039            for child in &self.children {
1040                child.get(i, &mut candidate);
1041            }
1042            if params_equal(&candidate, &projected) {
1043                return Ok(());
1044            }
1045        }
1046        // Build a display of the mismatched values
1047        let values: Vec<String> = projected
1048            .iter()
1049            .map(|(k, v)| format!("{}={}", k, v.value.to_display_string()))
1050            .collect();
1051        Err(format!(
1052            "The values {{{}}}, of an association expression in the combination expression, do not appear in the parameter space.",
1053            values.join(", ")
1054        ))
1055    }
1056    fn iter(&self) -> Box<dyn NodeIterator> {
1057        Box::new(AssociationIterator::new(&self.children))
1058    }
1059}
1060
1061/// Iterator for AssociationNode: lockstep iteration of children.
1062struct AssociationIterator {
1063    children: Vec<ChildIterator>,
1064}
1065
1066impl AssociationIterator {
1067    fn new(children: &[Box<dyn Node>]) -> Self {
1068        let children = children
1069            .iter()
1070            .map(|child| ChildIterator {
1071                iter: child.iter(),
1072                current: TaskParameterSet::new(),
1073            })
1074            .collect();
1075        Self { children }
1076    }
1077}
1078
1079impl NodeIterator for AssociationIterator {
1080    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
1081        for child in &mut self.children {
1082            child.current.clear();
1083            if !child.iter.next(&mut child.current) {
1084                return false;
1085            }
1086            result.extend(child.current.iter().map(|(k, v)| (k.clone(), v.clone())));
1087        }
1088        true
1089    }
1090    fn reset(&mut self) {
1091        for child in &mut self.children {
1092            child.iter.reset();
1093            child.current.clear();
1094        }
1095    }
1096}
1097
1098/// Adaptive chunk node: produces chunks on the fly based on mutable `default_task_count`.
1099struct AdaptiveChunkNode {
1100    name: String,
1101    values: Vec<i64>,
1102    default_task_count: Arc<AtomicUsize>,
1103    range_constraint: RangeConstraint,
1104}
1105
1106impl Node for AdaptiveChunkNode {
1107    fn len(&self) -> usize {
1108        // Upper bound: one chunk per value. Actual count depends on runtime chunk size.
1109        // Used only for association length validation during construction.
1110        let dtc = self.default_task_count.load(Ordering::Relaxed).max(1);
1111        self.values.len().div_ceil(dtc)
1112    }
1113    fn get(&self, _index: usize, _result: &mut TaskParameterSet) {
1114        // Random access not supported — use iter() instead.
1115    }
1116    fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
1117        let v = params.get(&self.name).ok_or_else(|| {
1118            format!(
1119                "Parameter '{}' not found in the provided parameters.",
1120                self.name
1121            )
1122        })?;
1123        match &v.value {
1124            ExprValue::RangeExpr(r) => {
1125                let valid: HashSet<i64> = self.values.iter().copied().collect();
1126                for val in r.iter() {
1127                    if !valid.contains(&val) {
1128                        return Err(format!(
1129                            "Parameter '{}' value '{}' is not a subset of the range in the parameter space.",
1130                            self.name, r
1131                        ));
1132                    }
1133                }
1134                Ok(())
1135            }
1136            _ => Err(format!(
1137                "Parameter '{}' value '{}' is not in the parameter space range.",
1138                self.name,
1139                v.value.to_display_string()
1140            )),
1141        }
1142    }
1143    fn iter(&self) -> Box<dyn NodeIterator> {
1144        Box::new(AdaptiveChunkIterator {
1145            name: self.name.clone(),
1146            values: self.values.clone(),
1147            default_task_count: self.default_task_count.clone(),
1148            range_constraint: self.range_constraint.clone(),
1149            cursor: 0,
1150        })
1151    }
1152}
1153
1154/// Iterator for adaptive chunk nodes.
1155struct AdaptiveChunkIterator {
1156    name: String,
1157    values: Vec<i64>,
1158    default_task_count: Arc<AtomicUsize>,
1159    range_constraint: RangeConstraint,
1160    cursor: usize,
1161}
1162
1163impl AdaptiveChunkIterator {
1164    fn make_chunk(&self, slice: &[i64]) -> RangeExpr {
1165        let range_str = match self.range_constraint {
1166            RangeConstraint::Contiguous => {
1167                if slice.len() == 1 {
1168                    slice[0].to_string()
1169                } else {
1170                    format!("{}-{}", slice[0], slice[slice.len() - 1])
1171                }
1172            }
1173            RangeConstraint::Noncontiguous => compress_range_expr(slice),
1174        };
1175        let expr = range_str
1176            .parse::<RangeExpr>()
1177            .expect("range string built from valid integers");
1178        match self.range_constraint {
1179            RangeConstraint::Contiguous => expr.with_contiguous(true),
1180            RangeConstraint::Noncontiguous => expr,
1181        }
1182    }
1183}
1184
1185impl NodeIterator for AdaptiveChunkIterator {
1186    fn next(&mut self, result: &mut TaskParameterSet) -> bool {
1187        if self.cursor >= self.values.len() {
1188            return false;
1189        }
1190        let chunk_size = self.default_task_count.load(Ordering::Relaxed).max(1);
1191        let chunk = match self.range_constraint {
1192            RangeConstraint::Contiguous => {
1193                let start = self.cursor;
1194                let mut end = start + 1;
1195                while end < self.values.len()
1196                    && end - start < chunk_size
1197                    && self.values[end] == self.values[end - 1] + 1
1198                {
1199                    end += 1;
1200                }
1201                let slice = &self.values[start..end];
1202                self.cursor = end;
1203                self.make_chunk(slice)
1204            }
1205            RangeConstraint::Noncontiguous => {
1206                let end = (self.cursor + chunk_size).min(self.values.len());
1207                let slice = &self.values[self.cursor..end];
1208                self.cursor = end;
1209                self.make_chunk(slice)
1210            }
1211        };
1212        result.insert(
1213            self.name.clone(),
1214            TaskParameterValue {
1215                param_type: TaskParameterType::ChunkInt,
1216                value: ExprValue::RangeExpr(chunk),
1217            },
1218        );
1219        true
1220    }
1221    fn reset(&mut self) {
1222        self.cursor = 0;
1223    }
1224}
1225
1226// ── Public API ──
1227
1228/// Lazy iterator over a resolved step parameter space.
1229pub struct StepParameterSpaceIterator {
1230    root: Box<dyn Node>,
1231    names: HashSet<String>,
1232    current_index: usize,
1233    adaptive: bool,
1234    adaptive_chunk_size: Option<Arc<AtomicUsize>>,
1235    node_iter: Option<Box<dyn NodeIterator>>,
1236    chunks_param_name: Option<String>,
1237    /// True when iteration must be sequential (adaptive or contiguous chunking).
1238    sequential: bool,
1239}
1240
1241impl StepParameterSpaceIterator {
1242    /// Construct from a resolved `StepParameterSpace`.
1243    pub fn new(space: &job::StepParameterSpace) -> Result<Self, ModelError> {
1244        Self::new_inner(space, None)
1245    }
1246
1247    /// Create with an explicit chunk task count override.
1248    /// When `Some(1)`, disables adaptive chunking and counts individual tasks.
1249    pub fn new_with_chunk_override(
1250        space: &job::StepParameterSpace,
1251        override_count: Option<usize>,
1252    ) -> Result<Self, ModelError> {
1253        Self::new_inner(space, override_count)
1254    }
1255
1256    fn new_inner(
1257        space: &job::StepParameterSpace,
1258        chunk_override: Option<usize>,
1259    ) -> Result<Self, ModelError> {
1260        let names: HashSet<String> = space.task_parameter_definitions.keys().cloned().collect();
1261
1262        if space.task_parameter_definitions.is_empty() {
1263            return Ok(Self {
1264                root: Box::new(ZeroDimSpaceNode),
1265                names,
1266                current_index: 0,
1267                adaptive: false,
1268                adaptive_chunk_size: None,
1269                node_iter: None,
1270                chunks_param_name: None,
1271                sequential: false,
1272            });
1273        }
1274
1275        let expr = space.combination.as_deref().unwrap_or("*");
1276
1277        // Check if any parameter needs adaptive chunking
1278        let mut adaptive_info: Option<(String, Arc<AtomicUsize>)> = None;
1279        if chunk_override.is_none() {
1280            for (name, param) in &space.task_parameter_definitions {
1281                if let job::TaskParameter::ChunkInt { chunks, .. } = param {
1282                    if chunks.target_runtime_seconds.is_some_and(|t| t > 0) {
1283                        let arc = Arc::new(AtomicUsize::new(chunks.default_task_count.max(1)));
1284                        adaptive_info = Some((name.clone(), arc));
1285                        break;
1286                    }
1287                }
1288            }
1289        }
1290
1291        let root = if expr.trim() == "*" {
1292            // Default: no explicit combination — product of all params in definition order
1293            let mut children: Vec<Box<dyn Node>> = Vec::new();
1294            let mut adaptive_idx = None;
1295            for (i, name) in space.task_parameter_definitions.keys().enumerate() {
1296                if adaptive_info.as_ref().is_some_and(|(n, _)| n == name) {
1297                    adaptive_idx = Some(i);
1298                }
1299                children.push(make_leaf_node(name, space, &adaptive_info, chunk_override)?);
1300            }
1301            // Move adaptive child to the end (innermost/fastest-varying) to match Python
1302            if let Some(idx) = adaptive_idx {
1303                let child = children.remove(idx);
1304                children.push(child);
1305            }
1306            if children.len() == 1 {
1307                // SAFETY: We just checked len() == 1, so into_iter().next() always
1308                // returns Some. Using into_iter avoids an unwrap on pop().
1309                children
1310                    .into_iter()
1311                    .next()
1312                    .expect("non-empty vec with len 1")
1313            } else {
1314                let length = checked_product_len(&children)?;
1315                Box::new(ProductNode { children, length })
1316            }
1317        } else {
1318            let tokens = tokenize(expr);
1319            parse_node_expr(&tokens, space, &adaptive_info, chunk_override)?
1320        };
1321
1322        let adaptive = adaptive_info.is_some();
1323        let chunks_param_name = adaptive_info.as_ref().map(|(n, _)| n.clone());
1324        let adaptive_chunk_size = adaptive_info.map(|(_, rc)| rc);
1325
1326        // Use iterator path if any node requires sequential iteration
1327        // (adaptive chunking or contiguous chunking with gaps)
1328        let needs_sequential = adaptive || has_contiguous_chunks(space);
1329        let node_iter = if needs_sequential {
1330            Some(root.iter())
1331        } else {
1332            None
1333        };
1334
1335        Ok(Self {
1336            root,
1337            names,
1338            current_index: 0,
1339            adaptive,
1340            adaptive_chunk_size,
1341            node_iter,
1342            chunks_param_name,
1343            sequential: needs_sequential,
1344        })
1345    }
1346
1347    pub fn names(&self) -> &HashSet<String> {
1348        &self.names
1349    }
1350
1351    pub fn len(&self) -> usize {
1352        if self.adaptive {
1353            0
1354        } else {
1355            self.root.len()
1356        }
1357    }
1358
1359    pub fn is_empty(&self) -> bool {
1360        if self.adaptive {
1361            false
1362        } else {
1363            self.root.len() == 0
1364        }
1365    }
1366
1367    /// Random access to a specific task parameter set by index.
1368    /// Returns `None` for out-of-bounds or when sequential iteration is required.
1369    pub fn get(&self, index: usize) -> Option<TaskParameterSet> {
1370        if self.sequential {
1371            return None;
1372        }
1373        if index >= self.root.len() {
1374            return None;
1375        }
1376        let mut result = TaskParameterSet::new();
1377        self.root.get(index, &mut result);
1378        Some(result)
1379    }
1380
1381    /// Check if a parameter set is contained in this space.
1382    pub fn contains(&self, params: &TaskParameterSet) -> bool {
1383        self.validate_containment(params).is_ok()
1384    }
1385
1386    /// Validate that a parameter set is contained in this space.
1387    /// Returns a detailed error message if not.
1388    pub fn validate_containment(&self, params: &TaskParameterSet) -> Result<(), String> {
1389        let mut params_keys: Vec<&str> = params.keys().map(|s| s.as_str()).collect();
1390        let mut space_keys: Vec<&str> = self.names.iter().map(|s| s.as_str()).collect();
1391        params_keys.sort();
1392        space_keys.sort();
1393        if params_keys != space_keys {
1394            return Err(format!(
1395                "Task parameter names {:?} do not match the parameter space names {:?}.",
1396                params_keys, space_keys
1397            ));
1398        }
1399        self.root.validate_containment(params)
1400    }
1401
1402    /// Whether adaptive chunking is active.
1403    pub fn chunks_adaptive(&self) -> bool {
1404        self.adaptive
1405    }
1406
1407    /// The parameter name used for chunking, if any.
1408    pub fn chunks_parameter_name(&self) -> Option<&str> {
1409        self.chunks_param_name.as_deref()
1410    }
1411
1412    /// Current default_task_count for adaptive chunking.
1413    pub fn chunks_default_task_count(&self) -> Option<usize> {
1414        self.adaptive_chunk_size
1415            .as_ref()
1416            .map(|a| a.load(Ordering::Relaxed))
1417    }
1418
1419    /// Update the chunk size for adaptive chunking.
1420    pub fn set_chunks_default_task_count(&mut self, value: usize) {
1421        if let Some(ref a) = self.adaptive_chunk_size {
1422            a.store(value, Ordering::Relaxed);
1423            // The Arc<AtomicUsize> propagates to the live iterator — no reset needed.
1424        }
1425    }
1426
1427    /// Rewind the iterator to the beginning so a fresh `Iterator::next`
1428    /// walk yields the same elements again.
1429    ///
1430    /// For non-sequential (random-access) iterators this resets the
1431    /// internal cursor used by `Iterator::next` to 0. For sequential
1432    /// (adaptive or contiguous-with-gaps) iterators it delegates to the
1433    /// inner node iterator's `reset()`. The adaptive `Arc<AtomicUsize>`
1434    /// chunk size is preserved across resets — `reset()` does not undo
1435    /// `set_chunks_default_task_count`.
1436    pub fn reset(&mut self) {
1437        self.current_index = 0;
1438        if let Some(iter) = self.node_iter.as_mut() {
1439            iter.reset();
1440        }
1441    }
1442}
1443
1444fn params_equal(a: &TaskParameterSet, b: &TaskParameterSet) -> bool {
1445    if a.len() != b.len() {
1446        return false;
1447    }
1448    a.iter().all(|(k, v)| {
1449        b.get(k)
1450            .is_some_and(|bv| expr_value_eq(&v.value, &bv.value))
1451    })
1452}
1453
1454fn expr_value_eq(a: &ExprValue, b: &ExprValue) -> bool {
1455    match (a, b) {
1456        (ExprValue::Int(x), ExprValue::Int(y)) => x == y,
1457        (ExprValue::Float(x), ExprValue::Float(y)) => x.value() == y.value(),
1458        (ExprValue::String(x), ExprValue::String(y)) => x == y,
1459        (ExprValue::RangeExpr(x), ExprValue::RangeExpr(y)) => x == y,
1460        (ExprValue::Path { value: x, .. }, ExprValue::Path { value: y, .. }) => x == y,
1461        (ExprValue::String(x), ExprValue::Path { value: y, .. }) => x == y,
1462        (ExprValue::Path { value: x, .. }, ExprValue::String(y)) => x == y,
1463        _ => false,
1464    }
1465}
1466
1467impl Iterator for StepParameterSpaceIterator {
1468    type Item = TaskParameterSet;
1469    fn next(&mut self) -> Option<TaskParameterSet> {
1470        if self.sequential {
1471            let iter = self.node_iter.as_mut()?;
1472            let mut result = TaskParameterSet::new();
1473            if iter.next(&mut result) {
1474                Some(result)
1475            } else {
1476                None
1477            }
1478        } else {
1479            let item = self.get(self.current_index)?;
1480            self.current_index += 1;
1481            Some(item)
1482        }
1483    }
1484
1485    fn size_hint(&self) -> (usize, Option<usize>) {
1486        if self.adaptive {
1487            (0, None)
1488        } else {
1489            let remaining = self.root.len().saturating_sub(self.current_index);
1490            (remaining, Some(remaining))
1491        }
1492    }
1493}
1494
1495// ── Node construction from combination expression ──
1496
1497fn parse_node_expr(
1498    tokens: &[String],
1499    space: &job::StepParameterSpace,
1500    adaptive_info: &Option<(String, Arc<AtomicUsize>)>,
1501    chunk_override: Option<usize>,
1502) -> Result<Box<dyn Node>, ModelError> {
1503    let mut pos = 0;
1504    let result = parse_node_product(tokens, &mut pos, space, adaptive_info, chunk_override)?;
1505    if pos < tokens.len() {
1506        return Err(ModelError::DecodeValidation(format!(
1507            "Unexpected token '{}' in combination expression",
1508            tokens[pos]
1509        )));
1510    }
1511    Ok(result)
1512}
1513
1514fn parse_node_product(
1515    tokens: &[String],
1516    pos: &mut usize,
1517    space: &job::StepParameterSpace,
1518    adaptive_info: &Option<(String, Arc<AtomicUsize>)>,
1519    chunk_override: Option<usize>,
1520) -> Result<Box<dyn Node>, ModelError> {
1521    let mut children = vec![parse_node_element(
1522        tokens,
1523        pos,
1524        space,
1525        adaptive_info,
1526        chunk_override,
1527    )?];
1528    while *pos < tokens.len() && tokens[*pos] == "*" {
1529        *pos += 1;
1530        children.push(parse_node_element(
1531            tokens,
1532            pos,
1533            space,
1534            adaptive_info,
1535            chunk_override,
1536        )?);
1537    }
1538    if children.len() == 1 {
1539        // SAFETY: We just checked len() == 1, so into_iter().next() always
1540        // returns Some. Using into_iter avoids an unwrap on pop().
1541        Ok(children
1542            .into_iter()
1543            .next()
1544            .expect("non-empty vec with len 1"))
1545    } else {
1546        let length = checked_product_len(&children)?;
1547        Ok(Box::new(ProductNode { children, length }))
1548    }
1549}
1550
1551fn parse_node_element(
1552    tokens: &[String],
1553    pos: &mut usize,
1554    space: &job::StepParameterSpace,
1555    adaptive_info: &Option<(String, Arc<AtomicUsize>)>,
1556    chunk_override: Option<usize>,
1557) -> Result<Box<dyn Node>, ModelError> {
1558    if *pos >= tokens.len() {
1559        return Err(ModelError::DecodeValidation(
1560            "Unexpected end of combination expression".into(),
1561        ));
1562    }
1563    if tokens[*pos] == "(" {
1564        *pos += 1;
1565        let mut children = vec![parse_node_product(
1566            tokens,
1567            pos,
1568            space,
1569            adaptive_info,
1570            chunk_override,
1571        )?];
1572        while *pos < tokens.len() && tokens[*pos] == "," {
1573            *pos += 1;
1574            children.push(parse_node_product(
1575                tokens,
1576                pos,
1577                space,
1578                adaptive_info,
1579                chunk_override,
1580            )?);
1581        }
1582        if *pos >= tokens.len() || tokens[*pos] != ")" {
1583            return Err(ModelError::DecodeValidation(
1584                "Missing closing parenthesis in combination".into(),
1585            ));
1586        }
1587        *pos += 1;
1588        let length = children[0].len();
1589        for child in children.iter().skip(1) {
1590            if child.len() != length {
1591                return Err(ModelError::DecodeValidation(format!(
1592                    "Associative combination: all members must have the same number of values, got {} and {}",
1593                    length, child.len()
1594                )));
1595            }
1596        }
1597        if children.len() == 1 {
1598            Err(ModelError::DecodeValidation(
1599                "Association expression must have more than one term.".into(),
1600            ))
1601        } else {
1602            Ok(Box::new(AssociationNode { children, length }))
1603        }
1604    } else {
1605        let name = &tokens[*pos];
1606        *pos += 1;
1607        make_leaf_node(name, space, adaptive_info, chunk_override)
1608    }
1609}
1610
1611/// Create a leaf node for a parameter name from the resolved definitions.
1612fn make_leaf_node(
1613    name: &str,
1614    space: &job::StepParameterSpace,
1615    adaptive_info: &Option<(String, Arc<AtomicUsize>)>,
1616    chunk_override: Option<usize>,
1617) -> Result<Box<dyn Node>, ModelError> {
1618    let param = space.task_parameter_definitions.get(name).ok_or_else(|| {
1619        ModelError::DecodeValidation(format!(
1620            "Unknown parameter '{name}' in combination expression"
1621        ))
1622    })?;
1623
1624    match param {
1625        job::TaskParameter::Int { range, chunks } => {
1626            if let Some(chunk_cfg) = chunks {
1627                return make_chunk_node(name, range, chunk_cfg, adaptive_info, chunk_override);
1628            }
1629            match range {
1630                job::TaskParamRange::List(v) => Ok(Box::new(RangeListNode {
1631                    name: name.to_string(),
1632                    param_type: TaskParameterType::Int,
1633                    values: v.iter().map(|&i| ExprValue::Int(i)).collect(),
1634                })),
1635                job::TaskParamRange::RangeExpr(r) => Ok(Box::new(RangeExprNode {
1636                    name: name.to_string(),
1637                    range: r.clone(),
1638                })),
1639            }
1640        }
1641        job::TaskParameter::Float { range } => Ok(Box::new(RangeListNode {
1642            name: name.to_string(),
1643            param_type: TaskParameterType::Float,
1644            values: range
1645                .iter()
1646                .map(|&f| {
1647                    Float64::new(f).map(ExprValue::Float).map_err(|_| {
1648                        ModelError::DecodeValidation(format!(
1649                            "Parameter '{name}': float value {f} is not finite"
1650                        ))
1651                    })
1652                })
1653                .collect::<Result<Vec<_>, _>>()?,
1654        })),
1655        job::TaskParameter::String { range } => Ok(Box::new(RangeListNode {
1656            name: name.to_string(),
1657            param_type: TaskParameterType::String,
1658            values: range.iter().map(|s| ExprValue::String(s.clone())).collect(),
1659        })),
1660        job::TaskParameter::Path { range } => Ok(Box::new(RangeListNode {
1661            name: name.to_string(),
1662            param_type: TaskParameterType::Path,
1663            values: range.iter().map(|s| ExprValue::String(s.clone())).collect(),
1664        })),
1665        job::TaskParameter::ChunkInt { range, chunks } => {
1666            make_chunk_node(name, range, chunks, adaptive_info, chunk_override)
1667        }
1668    }
1669}
1670
1671/// Check if any chunk parameter uses contiguous constraint (requires sequential iteration).
1672fn has_contiguous_chunks(space: &job::StepParameterSpace) -> bool {
1673    space.task_parameter_definitions.values().any(|p| {
1674        matches!(
1675            p,
1676            job::TaskParameter::ChunkInt { chunks, .. }
1677                if chunks.range_constraint == RangeConstraint::Contiguous
1678        )
1679    })
1680}
1681
1682/// Build a chunk node from a range and chunk config. Creates `AdaptiveChunkNode` when
1683/// `target_runtime_seconds > 0`, `ContiguousChunkNode` for contiguous static chunking,
1684/// or `StaticChunkNode` for noncontiguous static chunking.
1685fn make_chunk_node(
1686    name: &str,
1687    range: &job::TaskParamRange<i64>,
1688    chunks: &job::ResolvedChunks,
1689    adaptive_info: &Option<(String, Arc<AtomicUsize>)>,
1690    chunk_override: Option<usize>,
1691) -> Result<Box<dyn Node>, ModelError> {
1692    // Check if this parameter should use adaptive chunking
1693    if let Some((adaptive_name, rc)) = adaptive_info {
1694        if adaptive_name == name {
1695            let values: Vec<i64> = match range {
1696                job::TaskParamRange::List(v) => v.clone(),
1697                job::TaskParamRange::RangeExpr(r) => r.iter().collect(),
1698            };
1699            return Ok(Box::new(AdaptiveChunkNode {
1700                name: name.to_string(),
1701                values,
1702                default_task_count: rc.clone(),
1703                range_constraint: chunks.range_constraint.clone(),
1704            }));
1705        }
1706    }
1707
1708    // Use override if provided, otherwise use the template's default
1709    let default_task_count = chunk_override.unwrap_or(chunks.default_task_count).max(1);
1710
1711    let total_len = match range {
1712        job::TaskParamRange::List(v) => v.len(),
1713        job::TaskParamRange::RangeExpr(r) => r.len(),
1714    };
1715    if total_len == 0 {
1716        return Ok(Box::new(RangeListNode {
1717            name: name.to_string(),
1718            param_type: TaskParameterType::ChunkInt,
1719            values: Vec::new(),
1720        }));
1721    }
1722
1723    // Contiguous chunking must respect gaps in the source range
1724    if chunks.range_constraint == RangeConstraint::Contiguous {
1725        return Ok(Box::new(ContiguousChunkNode::new(
1726            name.to_string(),
1727            range.clone(),
1728            default_task_count,
1729        )));
1730    }
1731
1732    let chunk_count = total_len.div_ceil(default_task_count);
1733    let small = total_len / chunk_count;
1734    let leftovers = total_len % chunk_count;
1735
1736    Ok(Box::new(StaticChunkNode {
1737        name: name.to_string(),
1738        range: range.clone(),
1739        constraint: chunks.range_constraint.clone(),
1740        num_chunks: chunk_count,
1741        small,
1742        leftovers,
1743    }))
1744}
1745
1746#[cfg(test)]
1747mod tests {
1748    use super::*;
1749
1750    #[test]
1751    fn test_compress_range_expr() {
1752        assert_eq!(compress_range_expr(&[1, 2, 3]), "1-3");
1753        assert_eq!(compress_range_expr(&[1, 2, 3, 5, 7, 8, 9]), "1-3,5,7-9");
1754        assert_eq!(compress_range_expr(&[1]), "1");
1755        assert_eq!(compress_range_expr(&[1, 3]), "1,3");
1756        assert_eq!(compress_range_expr(&[]), "");
1757    }
1758
1759    #[test]
1760    fn test_tokenize() {
1761        assert_eq!(tokenize("A * B"), vec!["A", "*", "B"]);
1762        assert_eq!(
1763            tokenize("(A, B) * C"),
1764            vec!["(", "A", ",", "B", ")", "*", "C"]
1765        );
1766        assert_eq!(tokenize("A"), vec!["A"]);
1767    }
1768
1769    // ── Helper to build test spaces ──
1770
1771    fn make_space(
1772        params: Vec<(&str, job::TaskParameter)>,
1773        combination: Option<&str>,
1774    ) -> job::StepParameterSpace {
1775        let mut defs = indexmap::IndexMap::new();
1776        for (name, param) in params {
1777            defs.insert(name.to_string(), param);
1778        }
1779        job::StepParameterSpace {
1780            task_parameter_definitions: defs,
1781            combination: combination.map(|s| s.to_string()),
1782        }
1783    }
1784
1785    fn int_param(values: Vec<i64>) -> job::TaskParameter {
1786        job::TaskParameter::Int {
1787            range: job::TaskParamRange::List(values),
1788            chunks: None,
1789        }
1790    }
1791
1792    fn adaptive_chunk_param(values: Vec<i64>, default_task_count: usize) -> job::TaskParameter {
1793        job::TaskParameter::ChunkInt {
1794            range: job::TaskParamRange::List(values),
1795            chunks: job::ResolvedChunks {
1796                default_task_count,
1797                target_runtime_seconds: Some(60), // >0 triggers adaptive
1798                range_constraint: RangeConstraint::Noncontiguous,
1799            },
1800        }
1801    }
1802
1803    fn range_expr_param(expr: &str) -> job::TaskParameter {
1804        job::TaskParameter::Int {
1805            range: job::TaskParamRange::RangeExpr(expr.parse::<RangeExpr>().unwrap()),
1806            chunks: None,
1807        }
1808    }
1809
1810    fn static_chunk_param(expr: &str, default_task_count: usize) -> job::TaskParameter {
1811        job::TaskParameter::ChunkInt {
1812            range: job::TaskParamRange::RangeExpr(expr.parse::<RangeExpr>().unwrap()),
1813            chunks: job::ResolvedChunks {
1814                default_task_count,
1815                target_runtime_seconds: None,
1816                range_constraint: RangeConstraint::Contiguous,
1817            },
1818        }
1819    }
1820
1821    // ── Laziness tests ──
1822    // These use a 100-billion-element RangeExpr. If any code path eagerly
1823    // materializes the range, the test will OOM or hang — proving non-laziness.
1824
1825    const HUGE_RANGE: &str = "1-100000000000";
1826
1827    #[test]
1828    fn test_lazy_construction_range_expr() {
1829        let space = make_space(vec![("X", range_expr_param(HUGE_RANGE))], None);
1830        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1831        assert_eq!(iter.len(), 100_000_000_000);
1832    }
1833
1834    #[test]
1835    fn test_lazy_random_access_range_expr() {
1836        let space = make_space(vec![("X", range_expr_param(HUGE_RANGE))], None);
1837        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1838        let first = iter.get(0).unwrap();
1839        assert_eq!(first["X"].value, ExprValue::Int(1));
1840        let last = iter.get(99_999_999_999).unwrap();
1841        assert_eq!(last["X"].value, ExprValue::Int(100_000_000_000));
1842    }
1843
1844    #[test]
1845    fn test_lazy_product_with_huge_range() {
1846        let space = make_space(
1847            vec![
1848                ("A", int_param(vec![1, 2])),
1849                ("X", range_expr_param(HUGE_RANGE)),
1850            ],
1851            None,
1852        );
1853        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1854        assert_eq!(iter.len(), 200_000_000_000);
1855        // Random access into the middle
1856        let mid = iter.get(50_000_000_000).unwrap();
1857        assert!(mid.contains_key("A"));
1858        assert!(mid.contains_key("X"));
1859    }
1860
1861    #[test]
1862    fn test_lazy_iterate_first_few_of_huge_range() {
1863        let space = make_space(vec![("X", range_expr_param(HUGE_RANGE))], None);
1864        let mut iter = StepParameterSpaceIterator::new(&space).unwrap();
1865        let first = iter.next().unwrap();
1866        assert_eq!(first["X"].value, ExprValue::Int(1));
1867        let second = iter.next().unwrap();
1868        assert_eq!(second["X"].value, ExprValue::Int(2));
1869    }
1870
1871    #[test]
1872    fn test_lazy_product_iterate_first_few() {
1873        let space = make_space(
1874            vec![
1875                ("A", int_param(vec![10, 20])),
1876                ("X", range_expr_param(HUGE_RANGE)),
1877            ],
1878            None,
1879        );
1880        let mut iter = StepParameterSpaceIterator::new(&space).unwrap();
1881        // First item: A=10, X=1 (or A=20, X=1 depending on HashMap order)
1882        let first = iter.next().unwrap();
1883        assert!(first.contains_key("A"));
1884        assert!(first.contains_key("X"));
1885        // Just verify we can get a few without hanging
1886        for _ in 0..10 {
1887            assert!(iter.next().is_some());
1888        }
1889    }
1890
1891    #[test]
1892    fn test_lazy_static_chunk_with_huge_range() {
1893        // 100B items / 1000 per chunk = 100M chunks — construction must be lazy
1894        let space = make_space(vec![("C", static_chunk_param(HUGE_RANGE, 1000))], None);
1895        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1896        assert_eq!(iter.len(), 100_000_000);
1897        // Iterate first few chunks
1898        let first: Vec<_> = iter.take(3).collect();
1899        assert_eq!(first.len(), 3);
1900        assert!(first[0].contains_key("C"));
1901    }
1902
1903    #[test]
1904    fn test_lazy_iter_of_product_with_huge_range() {
1905        // Tests that ProductNode::iter() doesn't materialize the huge child
1906        let space = make_space(
1907            vec![
1908                ("A", int_param(vec![1, 2])),
1909                ("X", range_expr_param(HUGE_RANGE)),
1910                ("Chunk", adaptive_chunk_param(vec![10, 20, 30, 40], 2)),
1911            ],
1912            None,
1913        );
1914        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1915        assert!(iter.chunks_adaptive());
1916        // Iterate a few — must not OOM from materializing X's 100B values
1917        let mut count = 0;
1918        for params in iter {
1919            assert!(params.contains_key("A"));
1920            assert!(params.contains_key("X"));
1921            assert!(params.contains_key("Chunk"));
1922            count += 1;
1923            if count >= 5 {
1924                break;
1925            }
1926        }
1927        assert_eq!(count, 5);
1928    }
1929
1930    // ── Adaptive chunking tests ──
1931
1932    #[test]
1933    fn test_len_returns_zero_for_adaptive_chunking() {
1934        let space = make_space(
1935            vec![("Chunk", adaptive_chunk_param(vec![1, 2, 3, 4, 5, 6], 2))],
1936            None,
1937        );
1938        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1939        assert!(iter.chunks_adaptive());
1940        assert_eq!(iter.len(), 0);
1941    }
1942
1943    #[test]
1944    fn test_get_returns_none_for_adaptive_chunking() {
1945        let space = make_space(
1946            vec![("Chunk", adaptive_chunk_param(vec![1, 2, 3, 4, 5, 6], 2))],
1947            None,
1948        );
1949        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1950        assert!(iter.chunks_adaptive());
1951        assert!(iter.get(0).is_none());
1952    }
1953
1954    #[test]
1955    fn test_adaptive_chunking_with_multiple_params_iterates() {
1956        let space = make_space(
1957            vec![
1958                ("Frame", int_param(vec![1, 2])),
1959                ("Chunk", adaptive_chunk_param(vec![10, 20, 30, 40], 2)),
1960            ],
1961            None,
1962        );
1963        let iter = StepParameterSpaceIterator::new(&space).unwrap();
1964        assert!(iter.chunks_adaptive());
1965        let mut count = 0;
1966        for params in iter {
1967            assert!(params.contains_key("Frame"));
1968            assert!(params.contains_key("Chunk"));
1969            count += 1;
1970            if count > 100 {
1971                break;
1972            }
1973        }
1974        assert_eq!(count, 4);
1975    }
1976
1977    #[test]
1978    fn test_adaptive_chunking_single_param_iterates() {
1979        let space = make_space(
1980            vec![("Chunk", adaptive_chunk_param(vec![1, 2, 3, 4, 5, 6], 3))],
1981            None,
1982        );
1983        let results: Vec<_> = StepParameterSpaceIterator::new(&space).unwrap().collect();
1984        assert_eq!(results.len(), 2);
1985    }
1986
1987    #[test]
1988    fn test_adaptive_with_association_iterates() {
1989        let space = make_space(
1990            vec![
1991                ("Frame", int_param(vec![1, 2])),
1992                ("Chunk", adaptive_chunk_param(vec![10, 20], 1)),
1993            ],
1994            Some("(Frame, Chunk)"),
1995        );
1996        let results: Vec<_> = StepParameterSpaceIterator::new(&space).unwrap().collect();
1997        assert_eq!(results.len(), 2);
1998    }
1999
2000    // ── validate_containment tests ──
2001
2002    fn tpv(param_type: TaskParameterType, value: ExprValue) -> TaskParameterValue {
2003        TaskParameterValue { param_type, value }
2004    }
2005
2006    #[test]
2007    fn test_validate_containment_name_mismatch() {
2008        let space = make_space(vec![("Frame", int_param(vec![1, 2, 3]))], None);
2009        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2010        let mut params = TaskParameterSet::new();
2011        params.insert(
2012            "Wrong".into(),
2013            tpv(TaskParameterType::Int, ExprValue::Int(1)),
2014        );
2015        let err = iter.validate_containment(&params).unwrap_err();
2016        assert!(err.contains("do not match"), "got: {err}");
2017        assert!(err.contains("Wrong"), "got: {err}");
2018        assert!(err.contains("Frame"), "got: {err}");
2019    }
2020
2021    #[test]
2022    fn test_validate_containment_value_not_in_range() {
2023        let space = make_space(vec![("Frame", int_param(vec![1, 2, 3]))], None);
2024        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2025        let mut params = TaskParameterSet::new();
2026        params.insert(
2027            "Frame".into(),
2028            tpv(TaskParameterType::Int, ExprValue::Int(99)),
2029        );
2030        let err = iter.validate_containment(&params).unwrap_err();
2031        assert!(err.contains("Frame"), "got: {err}");
2032        assert!(err.contains("99"), "got: {err}");
2033        assert!(
2034            err.contains("not in the parameter space range"),
2035            "got: {err}"
2036        );
2037    }
2038
2039    #[test]
2040    fn test_validate_containment_range_expr_value_not_in_range() {
2041        let space = make_space(vec![("X", range_expr_param("1-10"))], None);
2042        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2043        let mut params = TaskParameterSet::new();
2044        params.insert("X".into(), tpv(TaskParameterType::Int, ExprValue::Int(99)));
2045        let err = iter.validate_containment(&params).unwrap_err();
2046        assert!(err.contains("X"), "got: {err}");
2047        assert!(err.contains("99"), "got: {err}");
2048        assert!(
2049            err.contains("not in the parameter space range"),
2050            "got: {err}"
2051        );
2052    }
2053
2054    #[test]
2055    fn test_validate_containment_success() {
2056        let space = make_space(vec![("Frame", int_param(vec![1, 2, 3]))], None);
2057        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2058        let mut params = TaskParameterSet::new();
2059        params.insert(
2060            "Frame".into(),
2061            tpv(TaskParameterType::Int, ExprValue::Int(2)),
2062        );
2063        assert!(iter.validate_containment(&params).is_ok());
2064    }
2065
2066    #[test]
2067    fn test_validate_containment_association_not_found() {
2068        let space = make_space(
2069            vec![("A", int_param(vec![1, 2])), ("B", int_param(vec![10, 20]))],
2070            Some("(A, B)"),
2071        );
2072        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2073        // A=1,B=20 is not a valid association pair (valid: A=1,B=10 and A=2,B=20)
2074        let mut params = TaskParameterSet::new();
2075        params.insert("A".into(), tpv(TaskParameterType::Int, ExprValue::Int(1)));
2076        params.insert("B".into(), tpv(TaskParameterType::Int, ExprValue::Int(20)));
2077        let err = iter.validate_containment(&params).unwrap_err();
2078        assert!(err.contains("association"), "got: {err}");
2079    }
2080
2081    #[test]
2082    fn test_validate_containment_chunk_not_subset() {
2083        let space = make_space(vec![("C", static_chunk_param("1-10", 5))], None);
2084        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2085        // Chunk "1-99" is not a subset of range 1-10
2086        let mut params = TaskParameterSet::new();
2087        params.insert(
2088            "C".into(),
2089            tpv(
2090                TaskParameterType::ChunkInt,
2091                ExprValue::RangeExpr("1-99".parse::<RangeExpr>().unwrap()),
2092            ),
2093        );
2094        let err = iter.validate_containment(&params).unwrap_err();
2095        assert!(err.contains("C"), "got: {err}");
2096        assert!(err.contains("not"), "got: {err}");
2097    }
2098
2099    // ── F2: get_value with range_expr returns values without panic ──
2100
2101    #[test]
2102    fn test_contiguous_chunk_stepped_range_iterates_without_panic() {
2103        // Stepped range (step=2) exercises the sr.get(idx) path in
2104        // count_contiguous_chunks_for_range and ContiguousChunkIterState::get_value
2105        let space = make_space(vec![("C", static_chunk_param("1-10:2", 2))], None);
2106        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2107        let results: Vec<_> = iter.collect();
2108        assert!(!results.is_empty(), "should produce at least one chunk");
2109        for r in &results {
2110            assert!(r.contains_key("C"));
2111        }
2112    }
2113
2114    #[test]
2115    fn test_range_expr_random_access_does_not_panic() {
2116        // Exercises RangeExprNode::get which calls r.get(i as i64)
2117        let space = make_space(vec![("X", range_expr_param("1-5"))], None);
2118        let iter = StepParameterSpaceIterator::new(&space).unwrap();
2119        for i in 0..5 {
2120            let set = iter.get(i).unwrap();
2121            assert_eq!(set["X"].value, ExprValue::Int(i as i64 + 1));
2122        }
2123    }
2124}