polaris_graph 0.4.4

Graph execution primitives for Polaris (Layer 2).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
//! Integration tests demonstrating compositional graph building.
//!
//! Tests are organized by complexity level, with each level building on the previous:
//! 1. **Primitives**: Basic systems and execution tracking via `SystemInfo`
//! 2. **Atomic Builders**: Single control flow constructs (parallel, decision, loop, switch)
//! 3. **Composite Builders**: Nested control flow (decision containing parallel, etc.)
//! 4. **Complex Compositions**: Multi-level nesting to verify arbitrary depth execution
//! 5. **Property-Based**: Random fragment trees verified against predicted execution counts
//!
//! ## Fragment DSL
//!
//! The `Fragment` enum is a declarative DSL for building test graphs. Each variant maps to
//! a graph construct (node type or control flow pattern). Fragments are recursively built
//! into a `Graph`, and each `Track` leaf registers a `LoggingSystem` whose `NodeId` is
//! collected into a `TrackerNodes` list. After execution, the test verifies that each
//! tracker node fired the expected number of times.
//!
//! Decision and Switch fragments are **parameterized**: `Decision { take_true }` controls
//! which branch is taken, and `Switch { case }` selects between cases "a" and "b".
//! `Fallible { handler }` represents a system that always fails, routing to its error
//! handler body via an error edge. This enables both hand-written tests with explicit
//! branching and property-based tests that randomly generate fragment trees with varying
//! branch choices.
//!
//! ## Prediction Model
//!
//! Each `Fragment` can predict how many times each tracker will fire via `predicted_counts()`.
//! The prediction follows the same traversal order as `build()`:
//! - **Track**: fires once per enclosing execution (multiplied by any enclosing loops)
//! - **Seq/Par**: concatenate child predictions
//! - **Decision**: taken branch gets real counts, non-taken branch gets zeros
//! - **Loop { n }**: multiplies body counts by `n`
//! - **Switch**: selected case gets real counts, unselected case gets zeros
//! - **Fallible**: handler body predictions (failing system has no tracker)
//! - **Scope**: delegates to body (scope executes once, same multiplier)
//!
//! ## Property-Based Testing
//!
//! The `prop_tests` module uses `proptest` to generate random `Fragment` trees (depth 3,
//! 256 cases). At each non-leaf level, one of the 7 composite types (Seq, Par, Decision,
//! Loop, Switch, Fallible, Scope) is chosen with equal probability, ensuring broad coverage of
//! nesting combinations including error handling and scope boundary paths. The property asserts that per-node
//! execution counts match the predicted counts for every generated tree.
//!
//! ## Test Infrastructure
//!
//! Uses `Res<SystemInfo>` (injected by `DevToolsPlugin`) to verify that the *correct*
//! nodes executed, not just that some counter incremented. This catches bugs in slot
//! calculation or graph construction that wouldn't be caught by slot-based counting.

mod test_utils;

use polaris_graph::executor::{ExecutionError, GraphExecutor};
use polaris_graph::graph::Graph;
use polaris_graph::node::{ContextPolicy, NodeId};
use test_utils::{
    DecisionOutput, DecisionSystem, ExecutionLog, FailingSystem, SwitchKeySystem, SwitchOutput,
    TrackerNodes, add_tracker, branch, create_test_server, get_hooks,
};

// ═══════════════════════════════════════════════════════════════════════════════
// FRAGMENT DSL
// ═══════════════════════════════════════════════════════════════════════════════

/// Declarative graph fragment for composable test builders.
/// Collects tracker `NodeId`s into a shared `TrackerNodes` during build.
///
/// `Debug` is derived so that `proptest` can display shrunk counterexamples.
#[derive(Clone, Debug)]
enum Fragment {
    /// Single tracking system.
    Track,
    /// Sequential execution: [a, b, c].
    Seq(Vec<Fragment>),
    /// Parallel branches.
    Par(Vec<Fragment>),
    /// Decision with parameterized predicate. When `take_true` is `true`, the
    /// `t` branch executes; otherwise the `f` branch executes.
    Decision {
        take_true: bool,
        t: Box<Fragment>,
        f: Box<Fragment>,
    },
    /// Bounded loop.
    Loop { n: usize, body: Box<Fragment> },
    /// Switch with parameterized discriminator. `case` selects which branch
    /// ("a" or "b") to execute.
    Switch {
        case: &'static str,
        a: Box<Fragment>,
        b: Box<Fragment>,
    },
    /// Fallible system with error handler. The system always fails, and the
    /// handler body executes via the error edge. Execution continues normally
    /// after the handler completes.
    Fallible { handler: Box<Fragment> },
    /// Scope node wrapping a body. Uses `Shared` mode (same context, no boundary)
    /// so that test infrastructure resources (`ExecutionLog`, `SystemInfo`) remain
    /// accessible to inner tracking systems.
    Scope { body: Box<Fragment> },
}

impl Fragment {
    /// Build fragment into graph, collecting tracker node IDs.
    ///
    /// Registration order (which determines tracker index assignment):
    /// - Decision: true branch first, then false branch
    /// - Switch: case "a" first, then case "b"
    fn build(&self, g: &mut Graph, trackers: &TrackerNodes) {
        match self {
            Fragment::Track => {
                trackers.add(add_tracker(g));
            }
            Fragment::Seq(items) => {
                for item in items {
                    item.build(g, trackers);
                }
            }
            Fragment::Par(branches) => {
                let branch_fns: Vec<_> = branches
                    .iter()
                    .map(|b| {
                        let b = b.clone();
                        let trackers = trackers.clone();
                        branch(move |g| b.build(g, &trackers))
                    })
                    .collect();
                g.add_parallel("par", branch_fns);
            }
            Fragment::Decision { take_true, t, f } => {
                let t = t.clone();
                let f = f.clone();
                let t_trackers = trackers.clone();
                let f_trackers = trackers.clone();

                g.add_boxed_system(Box::new(DecisionSystem {
                    take_true: *take_true,
                }));
                g.add_conditional_branch::<DecisionOutput, _, _, _>(
                    "decision",
                    |output| output.take_true,
                    move |g| t.build(g, &t_trackers),
                    move |g| f.build(g, &f_trackers),
                );
            }
            Fragment::Loop { n, body } => {
                let body = body.clone();
                let trackers = trackers.clone();
                g.add_loop_n("loop", *n, move |g| body.build(g, &trackers));
            }
            Fragment::Switch { case, a, b } => {
                let a = a.clone();
                let b = b.clone();
                let a_trackers = trackers.clone();
                let b_trackers = trackers.clone();

                g.add_boxed_system(Box::new(SwitchKeySystem { key: case }));
                g.add_switch::<SwitchOutput, _, _, _>(
                    "switch",
                    |output| output.key,
                    [
                        ("a", branch(move |g| a.build(g, &a_trackers))),
                        ("b", branch(move |g| b.build(g, &b_trackers))),
                    ],
                    None,
                );
            }
            Fragment::Fallible { handler } => {
                let handler = handler.clone();
                let trackers = trackers.clone();
                g.system_boxed(Box::new(FailingSystem))
                    .on_error(move |g| handler.build(g, &trackers))
                    .done();
            }
            Fragment::Scope { body } => {
                let mut inner = Graph::new();
                body.build(&mut inner, trackers);
                g.add_scope("scope", inner, ContextPolicy::shared());
            }
        }
    }

    /// Returns `true` if this fragment always diverts execution away from the
    /// sequential path (the failing system takes the error edge, so the
    /// sequential continuation is unreachable).
    fn is_diverting(&self) -> bool {
        match self {
            Fragment::Fallible { .. } => true,
            Fragment::Seq(items) => items.iter().any(Fragment::is_diverting),
            // Scope always completes normally in the test framework because
            // errors inside are caught by the Fallible handler. The error
            // diversion is contained within the scope's inner graph.
            _ => false,
        }
    }

    /// Returns the total number of tracker nodes in this fragment (structural).
    ///
    /// This is independent of branching choices — it counts all `Track` leaves
    /// in both taken and non-taken branches.
    fn tracker_count(&self) -> usize {
        match self {
            Fragment::Track => 1,
            Fragment::Seq(items) => items.iter().map(Fragment::tracker_count).sum(),
            Fragment::Par(branches) => branches.iter().map(Fragment::tracker_count).sum(),
            Fragment::Decision { t, f, .. } => t.tracker_count() + f.tracker_count(),
            Fragment::Loop { body, .. } | Fragment::Scope { body } => body.tracker_count(),
            Fragment::Switch { a, b, .. } => a.tracker_count() + b.tracker_count(),
            Fragment::Fallible { handler } => handler.tracker_count(),
        }
    }

    /// Returns a vector of zeros with length equal to `tracker_count()`.
    ///
    /// Used by `predicted_counts_inner` for non-taken branches: the trackers
    /// exist structurally but should never fire, so their expected count is 0.
    fn zero_counts(&self) -> Vec<usize> {
        vec![0; self.tracker_count()]
    }

    /// Returns per-tracker expected execution counts in registration order.
    ///
    /// The returned `Vec` has one entry per `Track` leaf in the fragment tree,
    /// ordered by the same depth-first traversal used in `build()`. Each entry
    /// is the number of times that tracker should fire during execution.
    fn predicted_counts(&self) -> Vec<usize> {
        self.predicted_counts_inner(1)
    }

    /// Inner recursive helper. `multiplier` accumulates from enclosing loops.
    ///
    /// For example, a `Track` inside `Loop { n: 3, .. }` inside `Loop { n: 2, .. }`
    /// would be called with `multiplier = 6` and return `vec![6]`.
    fn predicted_counts_inner(&self, multiplier: usize) -> Vec<usize> {
        match self {
            Fragment::Track => vec![multiplier],
            Fragment::Seq(items) => {
                let mut counts = Vec::new();
                let mut diverted = false;
                for item in items {
                    if diverted {
                        counts.extend(item.zero_counts());
                    } else {
                        counts.extend(item.predicted_counts_inner(multiplier));
                        if item.is_diverting() {
                            diverted = true;
                        }
                    }
                }
                counts
            }
            Fragment::Par(branches) => branches
                .iter()
                .flat_map(|b| b.predicted_counts_inner(multiplier))
                .collect(),
            Fragment::Decision { take_true, t, f } => {
                if *take_true {
                    let mut counts = t.predicted_counts_inner(multiplier);
                    counts.extend(f.zero_counts());
                    counts
                } else {
                    let mut counts = t.zero_counts();
                    counts.extend(f.predicted_counts_inner(multiplier));
                    counts
                }
            }
            Fragment::Loop { n, body } => body.predicted_counts_inner(multiplier * n),
            Fragment::Switch { case, a, b } => {
                if *case == "a" {
                    let mut counts = a.predicted_counts_inner(multiplier);
                    counts.extend(b.zero_counts());
                    counts
                } else {
                    let mut counts = a.zero_counts();
                    counts.extend(b.predicted_counts_inner(multiplier));
                    counts
                }
            }
            // FailingSystem has no tracker; only the handler body has trackers.
            Fragment::Fallible { handler } => handler.predicted_counts_inner(multiplier),
            // Scope executes once; body trackers fire with the same multiplier.
            Fragment::Scope { body } => body.predicted_counts_inner(multiplier),
        }
    }
}

// ─────────────────────────────────────────────────────────────────────────────
// Fragment DSL constructors
// ─────────────────────────────────────────────────────────────────────────────

fn track() -> Fragment {
    Fragment::Track
}

fn seq<I: IntoIterator<Item = Fragment>>(items: I) -> Fragment {
    Fragment::Seq(items.into_iter().collect())
}

fn par<I: IntoIterator<Item = Fragment>>(branches: I) -> Fragment {
    Fragment::Par(branches.into_iter().collect())
}

fn decision(take_true: bool, t: Fragment, f: Fragment) -> Fragment {
    Fragment::Decision {
        take_true,
        t: Box::new(t),
        f: Box::new(f),
    }
}

fn loop_n(n: usize, body: Fragment) -> Fragment {
    Fragment::Loop {
        n,
        body: Box::new(body),
    }
}

fn switch(case: &'static str, a: Fragment, b: Fragment) -> Fragment {
    Fragment::Switch {
        case,
        a: Box::new(a),
        b: Box::new(b),
    }
}

fn fallible(handler: Fragment) -> Fragment {
    Fragment::Fallible {
        handler: Box::new(handler),
    }
}

fn scope(body: Fragment) -> Fragment {
    Fragment::Scope {
        body: Box::new(body),
    }
}

// ═══════════════════════════════════════════════════════════════════════════════
// TEST RUNNER
// ═══════════════════════════════════════════════════════════════════════════════

/// Executes a fragment, returns the execution log and expected tracker nodes.
async fn run_fragment(fragment: Fragment) -> Result<(ExecutionLog, Vec<NodeId>), ExecutionError> {
    let mut graph = Graph::new();
    let trackers = TrackerNodes::default();
    fragment.build(&mut graph, &trackers);
    let expected = trackers.into_vec();

    let server = create_test_server();
    let hooks = get_hooks(&server);

    let mut ctx = server.create_context();
    let log = ExecutionLog::default();
    ctx.insert(log.clone());
    GraphExecutor::new()
        .execute(&graph, &mut ctx, hooks, None)
        .await?;
    Ok((log, expected))
}

// ═══════════════════════════════════════════════════════════════════════════════
// ATOMIC FRAGMENT BUILDERS
// ═══════════════════════════════════════════════════════════════════════════════

/// Sequence of N tracking systems.
fn sequence_n(count: usize) -> Fragment {
    seq((0..count).map(|_| track()))
}

/// Parallel with 2 tracking branches.
fn parallel_2() -> Fragment {
    par([track(), track()])
}

/// Decision that always takes true branch.
fn decision_true() -> Fragment {
    decision(true, track(), track())
}

/// Loop that executes N times with a tracker body.
fn loop_body_n(iterations: usize) -> Fragment {
    loop_n(iterations, track())
}

/// Switch that routes to case "a".
fn switch_to_a() -> Fragment {
    switch("a", track(), track())
}

// ═══════════════════════════════════════════════════════════════════════════════
// COMPOSITE FRAGMENT BUILDERS (Nested Control Flow)
// ═══════════════════════════════════════════════════════════════════════════════

/// before -> parallel([a, b]) -> after
/// Slots: 0 (before), 1 (branch a), 2 (branch b), 3 (after)
fn parallel_with_before_after() -> Fragment {
    seq([track(), par([track(), track()]), track()])
}

/// decision -> `true_branch`: [before -> parallel -> after]
/// Slots: 0..3 (true branch with parallel), 4 (false branch)
fn decision_with_parallel() -> Fragment {
    decision(true, parallel_with_before_after(), track())
}

/// switch -> case "a": [before -> parallel -> after]
/// Slots: 0..3 (case a with parallel), 4 (case b)
fn switch_with_parallel() -> Fragment {
    switch("a", parallel_with_before_after(), track())
}

/// loop(N) -> body: [parallel([a, b])]
/// Slots: 0 (branch a), 1 (branch b) - executed N times each
fn loop_with_parallel(iterations: usize) -> Fragment {
    loop_n(iterations, par([track(), track()]))
}

// ═══════════════════════════════════════════════════════════════════════════════
// COMPLEX FRAGMENT COMPOSITIONS (Multi-level Nesting)
// ═══════════════════════════════════════════════════════════════════════════════

/// Deeply nested structure:
/// decision -> parallel([
///     decision -> parallel -> after,
///     loop(2) -> parallel
/// ]) -> `after_all`
///
/// Tests 3 levels of nesting with mixed control flow types.
/// Slots layout:
/// 0..4: first parallel branch (decision with parallel)
/// 5..6: second parallel branch (loop with parallel, 2 iterations)
/// 7: after all
/// 8: outer false branch (not taken)
fn complex_nested() -> Fragment {
    decision(
        true,
        seq([
            par([decision_with_parallel(), loop_with_parallel(2)]),
            track(), // after_all
        ]),
        track(), // outer false branch
    )
}

// ═══════════════════════════════════════════════════════════════════════════════
// TESTS: Progressively More Complex
// ═══════════════════════════════════════════════════════════════════════════════

/// Verifies sequential execution: 3 systems in order.
#[tokio::test]
async fn test_sequence() {
    let (log, nodes) = run_fragment(sequence_n(3)).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 1);
    assert_eq!(log.count(&nodes[1]), 1);
    assert_eq!(log.count(&nodes[2]), 1);
    // Verify execution order
    assert_eq!(log.executed(), nodes);
}

/// Verifies parallel execution: both branches run.
#[tokio::test]
async fn test_parallel() {
    let (log, nodes) = run_fragment(parallel_2()).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 1, "branch a should execute");
    assert_eq!(log.count(&nodes[1]), 1, "branch b should execute");
}

/// Verifies decision execution: only true branch runs.
#[tokio::test]
async fn test_decision() {
    let (log, nodes) = run_fragment(decision_true()).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 1, "true branch should execute");
    assert_eq!(log.count(&nodes[1]), 0, "false branch should not execute");
}

/// Verifies loop execution: body runs N times.
#[tokio::test]
async fn test_loop() {
    let (log, nodes) = run_fragment(loop_body_n(5)).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 5, "loop body should execute 5 times");
}

/// Verifies switch execution: only matched case runs.
#[tokio::test]
async fn test_switch() {
    let (log, nodes) = run_fragment(switch_to_a()).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 1, "case a should execute");
    assert_eq!(log.count(&nodes[1]), 0, "case b should not execute");
}

/// Verifies parallel with convergence: before -> parallel -> after all execute.
#[tokio::test]
async fn test_parallel_converges() {
    let (log, nodes) = run_fragment(parallel_with_before_after()).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 1, "before should execute");
    assert_eq!(log.count(&nodes[1]), 1, "branch a should execute");
    assert_eq!(log.count(&nodes[2]), 1, "branch b should execute");
    assert_eq!(
        log.count(&nodes[3]),
        1,
        "after should execute (convergence)"
    );
}

/// Verifies decision containing parallel: all inner nodes execute including after.
#[tokio::test]
async fn test_decision_with_nested_parallel() {
    let (log, nodes) = run_fragment(decision_with_parallel()).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 1, "before parallel should execute");
    assert_eq!(log.count(&nodes[1]), 1, "parallel branch a should execute");
    assert_eq!(log.count(&nodes[2]), 1, "parallel branch b should execute");
    assert_eq!(log.count(&nodes[3]), 1, "after parallel should execute");
    assert_eq!(log.count(&nodes[4]), 0, "false branch should not execute");
}

/// Verifies switch containing parallel: all inner nodes execute including after.
#[tokio::test]
async fn test_switch_with_nested_parallel() {
    let (log, nodes) = run_fragment(switch_with_parallel()).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 1, "before parallel should execute");
    assert_eq!(log.count(&nodes[1]), 1, "parallel branch a should execute");
    assert_eq!(log.count(&nodes[2]), 1, "parallel branch b should execute");
    assert_eq!(log.count(&nodes[3]), 1, "after parallel should execute");
    assert_eq!(log.count(&nodes[4]), 0, "case b should not execute");
}

/// Verifies loop containing parallel: parallel executes on each iteration.
#[tokio::test]
async fn test_loop_with_nested_parallel() {
    let (log, nodes) = run_fragment(loop_with_parallel(3)).await.unwrap();

    assert_eq!(
        log.count(&nodes[0]),
        3,
        "parallel branch a should execute 3 times"
    );
    assert_eq!(
        log.count(&nodes[1]),
        3,
        "parallel branch b should execute 3 times"
    );
}

/// 3 levels of nesting with mixed control flow.
/// Structure: decision -> parallel([decision->parallel->after, loop->parallel]) -> after
#[tokio::test]
async fn test_complex_nested_composition() {
    let (log, nodes) = run_fragment(complex_nested()).await.unwrap();

    // First parallel branch: decision -> parallel -> after (nodes 0-4)
    assert_eq!(log.count(&nodes[0]), 1, "inner decision: before parallel");
    assert_eq!(log.count(&nodes[1]), 1, "inner decision: parallel branch a");
    assert_eq!(log.count(&nodes[2]), 1, "inner decision: parallel branch b");
    assert_eq!(log.count(&nodes[3]), 1, "inner decision: after parallel");
    assert_eq!(
        log.count(&nodes[4]),
        0,
        "inner decision: false branch not taken"
    );

    // Second parallel branch: loop(2) -> parallel (nodes 5-6, each 2x)
    assert_eq!(
        log.count(&nodes[5]),
        2,
        "loop parallel branch a (2 iterations)"
    );
    assert_eq!(
        log.count(&nodes[6]),
        2,
        "loop parallel branch b (2 iterations)"
    );

    // After the outer parallel
    assert_eq!(log.count(&nodes[7]), 1, "after outer parallel");

    // False branch not taken
    assert_eq!(log.count(&nodes[8]), 0, "outer false branch not taken");
}

/// Verifies that a fallible system routes to its error handler.
#[tokio::test]
async fn test_fallible_handler_executes() {
    let (log, nodes) = run_fragment(fallible(track())).await.unwrap();
    assert_eq!(log.count(&nodes[0]), 1, "error handler tracker should fire");
}

/// Verifies fallible in a sequence: before -> fallible(handler) -> after.
/// The "after" node is unreachable because the error edge diverts execution
/// away from the sequential path (the sequential edge from `FailingSystem` to
/// the after node is only taken on success).
#[tokio::test]
async fn test_fallible_in_sequence() {
    let fragment = seq([track(), fallible(track()), track()]);
    let (log, nodes) = run_fragment(fragment).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 1, "before should execute");
    assert_eq!(log.count(&nodes[1]), 1, "error handler should execute");
    assert_eq!(
        log.count(&nodes[2]),
        0,
        "after should not execute (unreachable after error diversion)"
    );
}

/// Verifies fallible inside a loop: handler fires on each iteration.
#[tokio::test]
async fn test_fallible_in_loop() {
    let fragment = loop_n(3, fallible(track()));
    let (log, nodes) = run_fragment(fragment).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 3, "error handler should fire 3 times");
}

/// Verifies fallible with a multi-node handler body.
#[tokio::test]
async fn test_fallible_with_seq_handler() {
    let fragment = fallible(seq([track(), track()]));
    let (log, nodes) = run_fragment(fragment).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 1, "handler step 1 should fire");
    assert_eq!(log.count(&nodes[1]), 1, "handler step 2 should fire");
}

/// Parameterized depth test: builds N levels of nested decisions, each containing parallel.
#[tokio::test]
async fn test_arbitrary_depth_nesting() {
    const DEPTH: usize = 5;

    /// Recursively builds nested decisions with parallel branches.
    fn nested_decisions(depth: usize) -> Fragment {
        if depth == 0 {
            track()
        } else {
            // decision -> true: parallel([nested, nested]), false: empty seq
            decision(
                true,
                par([nested_decisions(depth - 1), nested_decisions(depth - 1)]),
                seq([]), // Empty false branch
            )
        }
    }

    let result = run_fragment(nested_decisions(DEPTH)).await;

    assert!(
        result.is_ok(),
        "Depth {} nesting failed: {:?}",
        DEPTH,
        result.err()
    );
}

/// Verifies scope node: inner body executes, scope is transparent.
#[tokio::test]
async fn test_scope() {
    let (log, nodes) = run_fragment(scope(track())).await.unwrap();
    assert_eq!(log.count(&nodes[0]), 1, "scope body should execute");
}

/// Verifies scope in a sequence with parallel body.
#[tokio::test]
async fn test_scope_in_sequence() {
    let fragment = seq([track(), scope(par([track(), track()])), track()]);
    let (log, nodes) = run_fragment(fragment).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 1, "before scope");
    assert_eq!(log.count(&nodes[1]), 1, "scope parallel branch a");
    assert_eq!(log.count(&nodes[2]), 1, "scope parallel branch b");
    assert_eq!(log.count(&nodes[3]), 1, "after scope");
}

/// Verifies scope inside a loop: body runs on each iteration.
#[tokio::test]
async fn test_scope_in_loop() {
    let fragment = loop_n(3, scope(track()));
    let (log, nodes) = run_fragment(fragment).await.unwrap();

    assert_eq!(log.count(&nodes[0]), 3, "scope body should execute 3 times");
}

/// Demonstrates arbitrary composition with the Fragment DSL.
#[tokio::test]
async fn test_arbitrary_composition() {
    // Complex nested structure - fully declarative
    let complex = seq([
        track(),
        par([
            decision(true, par([track(), track()]), track()),
            loop_n(2, par([track(), track()])),
        ]),
        track(),
    ]);

    let (log, nodes) = run_fragment(complex).await.unwrap();

    // Verify execution counts:
    // nodes[0]: initial track
    assert_eq!(log.count(&nodes[0]), 1, "initial track");
    // nodes[1-2]: decision true branch parallel
    assert_eq!(log.count(&nodes[1]), 1, "decision true branch a");
    assert_eq!(log.count(&nodes[2]), 1, "decision true branch b");
    // nodes[3]: decision false branch (not taken)
    assert_eq!(log.count(&nodes[3]), 0, "decision false branch not taken");
    // nodes[4-5]: loop parallel (2 iterations each)
    assert_eq!(log.count(&nodes[4]), 2, "loop branch a (2 iterations)");
    assert_eq!(log.count(&nodes[5]), 2, "loop branch b (2 iterations)");
    // nodes[6]: final track
    assert_eq!(log.count(&nodes[6]), 1, "final track");
}

// ═══════════════════════════════════════════════════════════════════════════════
// PROPERTY-BASED TESTS
// ═══════════════════════════════════════════════════════════════════════════════

/// Property-based tests that generate random `Fragment` trees and verify that
/// execution counts match the predicted counts from `predicted_counts()`.
///
/// ## Property Under Test
///
/// **For all valid fragment trees, `predicted_counts()` equals the execution counts.**
///
/// This tests agreement between two implementations:
/// 1. `predicted_counts()` — a simple recursive prediction model
/// 2. `Fragment::build()` + `GraphExecutor::execute()` — the real implementation
///
/// ## Ground Truth
///
/// This property test alone cannot catch bugs where both implementations have
/// the same error. Ground truth comes from the hand-written unit tests above
/// (`test_loop`, `test_decision`, etc.) which assert hard-coded expected values.
///
/// Given that basic behaviors are verified by unit tests, this property test
/// verifies that **nested compositions** of those behaviors produce consistent
/// results across many randomly generated combinations.
///
/// ## Strategy Design
///
/// `arb_fragment(depth)` generates fragment trees recursively:
/// - **Leaf level** (`depth == 0`): Always produces `Track` — the only leaf type.
/// - **Inner levels** (`depth > 0`): Chooses among all 6 composite types with
///   equal probability. `Track` is excluded at inner levels so every generated
///   tree reaches full depth, preventing trivial trees.
///
/// With 7 composite types at each of 3 levels, running 256 cases provides good
/// coverage of nesting combinations.
///
/// ## Parameterized Branching
///
/// - `Decision`: `take_true` is randomly `true` or `false`, exercising both branches
/// - `Switch`: `case` is randomly `"a"` or `"b"`, exercising both cases
/// - `Loop`: iteration count ranges from 1 to 4
/// - `Seq`: 1 to 3 children
/// - `Par`: 2 to 4 branches
///
/// ## Async Handling
///
/// `proptest` does not natively support async test functions. Each test case
/// creates a `tokio::runtime::Runtime` and uses `block_on()` to run the async
/// `run_fragment` function synchronously within the proptest closure.
mod prop_tests {
    use super::*;
    use proptest::prelude::*;

    /// Generates a random `Fragment` tree of the given depth.
    ///
    /// At `depth == 0`, only `Track` leaves are produced.
    /// At `depth > 0`, only composite types are generated (equal weight),
    /// ensuring every tree reaches full depth. Includes `Fallible` to exercise
    /// error edge routing and `Scope` to exercise scope boundaries under
    /// arbitrary nesting.
    fn arb_fragment(depth: u32) -> BoxedStrategy<Fragment> {
        if depth == 0 {
            Just(Fragment::Track).boxed()
        } else {
            prop_oneof![
                prop::collection::vec(arb_fragment(depth - 1), 1..=3usize).prop_map(Fragment::Seq),
                prop::collection::vec(arb_fragment(depth - 1), 2..=4usize).prop_map(Fragment::Par),
                (
                    any::<bool>(),
                    arb_fragment(depth - 1),
                    arb_fragment(depth - 1)
                )
                    .prop_map(|(b, t, f)| decision(b, t, f)),
                (1..=4usize, arb_fragment(depth - 1)).prop_map(|(n, body)| loop_n(n, body)),
                (
                    prop_oneof![Just("a"), Just("b")],
                    arb_fragment(depth - 1),
                    arb_fragment(depth - 1),
                )
                    .prop_map(|(c, a, b)| switch(c, a, b)),
                arb_fragment(depth - 1).prop_map(fallible),
                arb_fragment(depth - 1).prop_map(scope),
            ]
            .boxed()
        }
    }

    proptest! {
        #![proptest_config(ProptestConfig::with_cases(256))]

        /// For every randomly generated fragment tree, the per-node
        /// execution count must match the prediction from `predicted_counts()`.
        #[test]
        fn prop_per_node_execution_matches_prediction(fragment in arb_fragment(3)) {
            let rt = tokio::runtime::Runtime::new().expect("tokio runtime");
            rt.block_on(async {
                let expected = fragment.predicted_counts();
                let (log, nodes) = run_fragment(fragment).await.expect("execution");
                assert_eq!(nodes.len(), expected.len(), "tracker count mismatch");
                for (i, (node, expected_count)) in nodes.iter().zip(&expected).enumerate() {
                    prop_assert_eq!(
                        log.count(node),
                        *expected_count,
                        "tracker[{}]", i
                    );
                }
                Ok(())
            })?;
        }
    }
}