graphrefly-core 0.0.4

GraphReFly handle-protocol core dispatcher
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
//! Slice A close (M1) regression tests — lock-released `invoke_fn` /
//! `custom_equals` + R1.3.5.a handshake tier-split + late-subscriber-
//! during-wave race fix.
//!
//! These exercise the wave-engine paths that lifted from the v1
//! "lock-held during binding callback" discipline. Each test is named for
//! the canonical-spec rule or porting-deferred entry it closes:
//!
//! - **`fire_fn` lock-released `invoke_fn`** — fns may re-enter Core during
//!   their own fire.
//! - **`commit_emission` lock-released `custom_equals`** — equals oracles
//!   may re-enter Core during evaluation.
//! - **R1.3.5.a handshake tier-split** — `[Start]` + `[Data(v)]` arrive as
//!   separate sink calls, not a bundled call.
//! - **Late-subscriber during wave** — a sink installed between fn-fire
//!   iterations does NOT receive duplicate `[Dirty, Data]` from already-
//!   queued wave messages (sink-snapshot-on-first-touch).

mod common;

use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

use common::{RecordedEvent, Recorder, TestBinding, TestRuntime, TestValue};

use graphrefly_core::{
    BindingBoundary, Core, DepBatch, EqualsMode, FnId, FnResult, HandleId, Message, NodeId, Sink,
};

// ---------------------------------------------------------------------------
// Fn re-entrance via invoke_fn
// ---------------------------------------------------------------------------

#[test]
fn fn_can_reenter_core_emit_during_invoke_fn_runs_nested_wave() {
    // A derived's fn calls `Core::emit(other_state, ...)` mid-fire. The
    // nested emit should run a nested wave (in_tick re-entrance) and
    // queue downstream work; the outer drain picks it up.
    //
    // **Phase H+ topology requirement (2026-05-09):** the cross-partition
    // emit during fire must obey ascending-order acquisition (Phase H+
    // option (d) limited variant). To satisfy that, declare s_side as a
    // meta-companion of s_in so the wave's `compute_touched_partitions`
    // acquires {partition(s_in), partition(s_side)} ascending UPFRONT;
    // the inner emit on s_side then becomes a re-entrant acquire (already
    // held by this thread) rather than a fresh descending acquire.
    let rt = TestRuntime::new();
    let s_in = rt.state(Some(TestValue::Int(0)));
    let s_side = rt.state(Some(TestValue::Int(0)));

    // Derived from s_in. When it fires, it ALSO emits a side effect on
    // s_side via Core::emit — which previously deadlocked because the
    // state lock was held across invoke_fn.
    let core = rt.core.clone();
    let s_side_id = s_side.id;
    let binding = rt.binding.clone();
    let d = rt.derived(&[s_in.id], move |deps| {
        if let TestValue::Int(n) = deps[0] {
            // Re-enter Core::emit lock-released. Should NOT deadlock.
            //
            // Phase H+ /qa N1(a) (2026-05-09): gate on `n > 0` so the
            // cross-partition emit fires from a TOP-LEVEL wave entry
            // (`s_in.set(7)` below) rather than from d's activation-
            // time fire. Activation goes through `Core::subscribe`'s
            // direct `partition_wave_owner_lock_arc` (NOT `begin_batch_for`),
            // which doesn't walk meta_companions; the cross-partition
            // acquire would then be descending and panic. Top-level
            // `s_in.set(...)` does go through `begin_batch_for(s_in)`
            // which walks `s_in`'s meta_companions and acquires
            // `{partition(s_in), partition(s_side)}` ascending upfront,
            // so the inner emit is re-entrant on a held partition.
            if n > 0 {
                let h = binding.intern(TestValue::Int(n * 10));
                core.emit(s_side_id, h);
            }
        }
        Some(deps[0].clone())
    });
    // Phase H+ topology requirement: declare s_side as a meta-companion
    // of s_in so the top-level `s_in.set(...)` wave below acquires
    // {partition(s_in), partition(s_side)} ascending upfront via
    // `compute_touched_partitions(s_in)`. Inner emit on s_side is
    // then a re-entrant acquire on a partition already held by the
    // wave, not a fresh descending acquire from inside fn-fire.
    //
    // /qa A7 (2026-05-09): note that `add_meta_companion` ALSO
    // induces R1.3.9.d TEARDOWN cascade — when `s_in` is torn down,
    // `s_side` will receive a Teardown event. This test samples its
    // assertions BEFORE `rt` drops, so the cascade doesn't affect
    // observable behavior. Future variants that inspect state AFTER
    // an explicit teardown must account for the cascaded Teardown
    // on s_side's recorder (filter or assert).
    let _ = d; // silence unused-binding warning if present
    rt.core.add_meta_companion(s_in.id, s_side_id);

    let rec_d = rt.subscribe_recorder(d);
    let rec_side = rt.subscribe_recorder(s_side.id);
    s_in.set(TestValue::Int(7));

    // Outer derived emitted 7; side state was emitted 70 from inside the fn.
    assert_eq!(
        rec_d.data_values(),
        vec![TestValue::Int(0), TestValue::Int(7)]
    );
    assert_eq!(
        rec_side.data_values(),
        vec![TestValue::Int(0), TestValue::Int(70)]
    );
}

#[test]
fn fn_can_reenter_core_pause_resume_during_invoke_fn() {
    // A fn calls Core::pause and Core::resume on another node mid-fire.
    let rt = TestRuntime::new();
    let s_in = rt.state(Some(TestValue::Int(0)));
    let s_other = rt.state(Some(TestValue::Int(100)));

    let core = rt.core.clone();
    let s_other_id = s_other.id;
    let pause_lock = core.alloc_lock_id();
    let _d = rt.derived(&[s_in.id], move |_deps| {
        // Pause + resume on s_other from inside fn — both lock-released.
        core.pause(s_other_id, pause_lock).expect("pause");
        let report = core.resume(s_other_id, pause_lock).expect("resume");
        // Both calls succeeded; pause buffer was empty so no replay.
        if let Some(r) = report {
            assert_eq!(r.replayed, 0);
            assert_eq!(r.dropped, 0);
        }
        None
    });

    let _rec_d = rt.subscribe_recorder(_d);
    s_in.set(TestValue::Int(1));
    // No deadlock; assertion above ensures pause/resume worked.
}

#[test]
fn fn_can_reenter_core_invalidate_during_invoke_fn() {
    let rt = TestRuntime::new();
    let s_in = rt.state(Some(TestValue::Int(0)));
    let s_other = rt.state(Some(TestValue::Int(100)));
    let s_other_id = s_other.id;

    // Subscribe rec_other FIRST so the activation-time fire of d (which
    // happens when we subscribe rec_d below) doesn't invalidate s_other
    // before we have an observer. R1.4 is "no-op on already-invalidated"
    // so a second invalidate would be silent.
    let rec_other = rt.subscribe_recorder(s_other_id);

    let core = rt.core.clone();
    let _d = rt.derived(&[s_in.id], move |deps| {
        // Phase H+ /qa N1(a) (2026-05-09): gate on `n != 0` so the
        // cross-partition invalidate fires from a TOP-LEVEL wave
        // entry (`s_in.set(1)` below), not from d's activation-time
        // fire. Same rationale as
        // `fn_can_reenter_core_emit_during_invoke_fn_runs_nested_wave`
        // above — subscribe-time activation goes through
        // `partition_wave_owner_lock_arc` directly, bypassing the
        // meta-companion walk in `compute_touched_partitions`.
        if let TestValue::Int(n) = deps[0] {
            if n != 0 {
                core.invalidate(s_other_id);
            }
        }
        Some(deps[0].clone())
    });
    // Phase H+ topology requirement: every wave entering d's fire
    // must touch s_other's partition upfront. The top-level
    // `s_in.set(...)` below drives begin_batch_for(s_in) which walks
    // s_in's meta_companions to s_other, acquires both partitions
    // ascending, then d's fire's inner invalidate on s_other is
    // re-entrant on a held partition.
    rt.core.add_meta_companion(s_in.id, s_other_id);

    let _rec_d = rt.subscribe_recorder(_d);

    // Activation-time fire of d already invalidated s_other; s_in.set is
    // a redundant trigger but exercises a second wave for good measure.
    s_in.set(TestValue::Int(1));

    // s_other got invalidated from inside the fn; rec_other observed it.
    assert!(rec_other.snapshot().contains(&RecordedEvent::Invalidate));
}

// ---------------------------------------------------------------------------
// custom_equals re-entrance
// ---------------------------------------------------------------------------

#[test]
fn custom_equals_can_reenter_core_during_emission() {
    // A custom equals oracle reads cache via Core::cache_of from inside
    // its evaluation. Previously held the lock across the binding call →
    // deadlock. Lock-released now → succeeds.
    let rt = TestRuntime::new();
    let s_in = rt.state(Some(TestValue::Int(0)));
    let probe_state = rt.state(Some(TestValue::Int(99)));

    let core = rt.core.clone();
    let probe_id = probe_state.id;
    // Custom equals: ALWAYS returns false (force is_data path), but
    // re-enters Core via cache_of mid-evaluation.
    let d = rt.derived_with_equals(
        &[s_in.id],
        |deps| Some(deps[0].clone()),
        move |_a, _b| {
            // Re-enter Core::cache_of from inside custom_equals — lock-released.
            let _ = core.cache_of(probe_id);
            false
        },
    );

    let rec = rt.subscribe_recorder(d);
    s_in.set(TestValue::Int(1));
    s_in.set(TestValue::Int(2));

    // Both updates flowed through (custom equals returned false → DATA).
    assert_eq!(
        rec.data_values(),
        vec![TestValue::Int(0), TestValue::Int(1), TestValue::Int(2)]
    );
}

// ---------------------------------------------------------------------------
// R1.3.5.a handshake tier-split
// ---------------------------------------------------------------------------

#[test]
fn handshake_tier_split_sentinel_state_one_call() {
    // Sentinel state node — handshake is just `[Start]`, one sink call.
    let rt = TestRuntime::new();
    let s = rt.state(None);

    let rec = rt.subscribe_recorder(s.id);

    assert_eq!(rec.call_count(), 1, "sentinel handshake = 1 sink call");
    assert_eq!(rec.call_boundaries(), vec![1]);
    assert_eq!(rec.snapshot(), vec![RecordedEvent::Start]);
}

#[test]
fn handshake_tier_split_cached_state_two_calls() {
    // R1.3.5.a: cached state subscribe should produce `[Start]` then
    // `[Data(v)]` as TWO separate sink calls (per-tier delivery), not
    // one bundled `[Start, Data(v)]` call.
    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(42)));

    let rec = rt.subscribe_recorder(s.id);

    assert_eq!(rec.call_count(), 2, "cached handshake = 2 sink calls");
    assert_eq!(rec.call_boundaries(), vec![1, 1]);
    assert_eq!(
        rec.snapshot(),
        vec![
            RecordedEvent::Start,
            RecordedEvent::Data(TestValue::Int(42))
        ]
    );
}

#[test]
fn handshake_tier_split_terminated_state_three_calls() {
    // Cached state that has terminated (non-resubscribable). Handshake
    // should be `[Start]`, `[Data(cached)]`, `[Complete]` — three calls.
    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(7)));
    rt.core.complete(s.id);

    let rec = rt.subscribe_recorder(s.id);

    assert_eq!(
        rec.call_count(),
        3,
        "cached + complete handshake = 3 sink calls"
    );
    assert_eq!(rec.call_boundaries(), vec![1, 1, 1]);
    assert_eq!(
        rec.snapshot(),
        vec![
            RecordedEvent::Start,
            RecordedEvent::Data(TestValue::Int(7)),
            RecordedEvent::Complete,
        ]
    );
}

#[test]
fn handshake_tier_split_torndown_node_four_calls() {
    // Torn-down cached state: `[Start]`, `[Data]`, `[Complete]`,
    // `[Teardown]` — four calls.
    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(5)));
    rt.core.teardown(s.id);

    let rec = rt.subscribe_recorder(s.id);

    assert_eq!(rec.call_count(), 4, "torn-down handshake = 4 sink calls");
    assert_eq!(rec.call_boundaries(), vec![1, 1, 1, 1]);
    assert_eq!(
        rec.snapshot(),
        vec![
            RecordedEvent::Start,
            RecordedEvent::Data(TestValue::Int(5)),
            RecordedEvent::Complete,
            RecordedEvent::Teardown,
        ]
    );
}

#[test]
fn handshake_tier_split_error_terminated_three_calls() {
    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(1)));
    let err_h = rt.binding.intern(TestValue::Str("boom".into()));
    rt.core.error(s.id, err_h);

    let rec = rt.subscribe_recorder(s.id);

    assert_eq!(rec.call_count(), 3);
    assert_eq!(rec.call_boundaries(), vec![1, 1, 1]);
    assert_eq!(
        rec.snapshot(),
        vec![
            RecordedEvent::Start,
            RecordedEvent::Data(TestValue::Int(1)),
            RecordedEvent::Error(TestValue::Str("boom".into())),
        ]
    );
}

// ---------------------------------------------------------------------------
// Late-subscriber-during-wave: sink-snapshot-on-first-touch race fix
// ---------------------------------------------------------------------------

#[test]
fn late_subscriber_installed_after_first_queue_notify_does_not_double_receive_data() {
    // Race-fix scenario (sink-snapshot-on-first-touch):
    //
    // The fn calls `core.emit(s, new_value)` (nested wave), then
    // `core.subscribe(s, late_sink)`. The nested emit's commit_emission
    // queues Dirty+Data on `s` BEFORE the late subscribe happens, so the
    // pending_notify entry's snapshot for `s` is taken without late_sink.
    // After subscribe runs, late_sink IS installed in s's `subscribers`,
    // but the wave's flush iterates `pending_notify[s].sinks` (the
    // snapshot), which excludes late_sink. The late subscriber only sees
    // its handshake `[Start, Data(new)]` — no duplicate Dirty+Data from
    // the wave's deferred flush.
    //
    // Without the snapshot-on-first-touch fix (the pre-Slice-A-close
    // model that re-snapshotted subscribers at flush time), late_sink
    // would have received both its handshake [Start, Data(new)] AND the
    // wave's [Dirty, Data(new)] — a duplicate Data delivery.
    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(0)));

    // Pre-existing subscriber on `s` so its pending_notify entry has a
    // non-empty sink snapshot at first touch (proves the snapshot is
    // populated, not just empty).
    let rec_existing = rt.subscribe_recorder(s.id);

    // Pre-create the late recorder.
    let late_recorder: Arc<Mutex<Option<Recorder>>> = Arc::new(Mutex::new(None));
    let late_rec = Recorder::new();
    let late_sink = late_rec.sink(rt.binding.clone());
    *late_recorder.lock().unwrap() = Some(late_rec);

    let core = rt.core.clone();
    let s_id = s.id;
    let binding = rt.binding.clone();
    let late_recorder_for_fn = late_recorder.clone();
    let late_sink_holder: Arc<Mutex<Option<Sink>>> = Arc::new(Mutex::new(Some(late_sink)));
    let late_sink_for_fn = late_sink_holder.clone();
    let trigger = rt.state(Some(TestValue::Int(0)));
    // Phase H+ topology requirement: declare s as a meta-companion of
    // trigger so the wave entered via trigger.set acquires both
    // partitions ascending UPFRONT; the nested emit + subscribe on s
    // are re-entrant acquires on a held partition.
    rt.core.add_meta_companion(trigger.id, s_id);

    let _d = rt.derived(&[trigger.id], move |deps| {
        if let TestValue::Int(n) = deps[0] {
            if n > 0 {
                // 1. Emit a new value to `s` from inside the fn (nested wave
                //    — commit_emission queues Dirty+Data on s with the
                //    snapshot of s's subscribers AT THIS MOMENT).
                let h = binding.intern(TestValue::Int(n * 100));
                core.emit(s_id, h);
                // 2. Subscribe a new sink to `s` AFTER the emit. The
                //    pending_notify[s] snapshot is already locked in;
                //    late_sink is not in it.
                if let Some(sink) = late_sink_for_fn.lock().unwrap().take() {
                    let sub = core.subscribe(s_id, sink);
                    if let Some(rec) = late_recorder_for_fn.lock().unwrap().as_ref() {
                        rec.attach(sub);
                    }
                }
            }
        }
        None
    });

    // Subscribe to the derived (sentinel cache → no activation-time fire
    // because trigger has Int(0) and the fn's `n > 0` branch is gated).
    let _rec_d = rt.subscribe_recorder(_d);

    // Trigger the derived's fire.
    trigger.set(TestValue::Int(7));

    // Existing subscriber on `s` sees the full sequence — handshake then
    // wave updates.
    let existing_events = rec_existing.snapshot();
    let existing_data: Vec<i64> = existing_events
        .iter()
        .filter_map(|e| match e {
            RecordedEvent::Data(TestValue::Int(n)) => Some(*n),
            _ => None,
        })
        .collect();
    assert_eq!(
        existing_data,
        vec![0, 700],
        "existing subscriber: handshake Data(0) + wave Data(700)"
    );

    // Late subscriber sees ONLY its handshake — `[Start, Data(700)]` as
    // 2 sink calls. NO duplicated Dirty+Data from the wave's flush.
    let late = late_recorder.lock().unwrap();
    let late_rec = late.as_ref().expect("late recorder");
    let late_events = late_rec.snapshot();
    assert_eq!(
        late_events,
        vec![
            RecordedEvent::Start,
            RecordedEvent::Data(TestValue::Int(700)),
        ],
        "late subscriber duplicated Data: {late_events:?}"
    );
    assert_eq!(
        late_rec.call_count(),
        2,
        "late subscriber: handshake split into [Start] + [Data(700)] = 2 calls"
    );
}

// ---------------------------------------------------------------------------
// Concurrent emit during invoke_fn — no deadlock
// ---------------------------------------------------------------------------

#[test]
fn concurrent_emit_on_disjoint_partitions_runs_truly_parallel() {
    // Slice Y1 / Phase E (D3, 2026-05-08) — INVERTED from the legacy
    // M1 contract `concurrent_emit_blocks_until_in_flight_wave_completes`.
    //
    // The legacy test asserted that cross-thread emits BLOCK at the
    // Core-global `wave_owner` re-entrant mutex until the in-flight
    // wave's drain + flush + sink-fire completes. Per session-doc Q4 +
    // decision D3 / D092, the wave engine now uses per-partition
    // `wave_owner` mutexes — two threads emitting on DISJOINT partitions
    // run truly parallel, the canonical Y1 parallelism win.
    //
    // Test shape:
    //   1. Thread A starts a wave on `s_a`'s partition (`s_a → _d`)
    //      whose fn blocks on `rx.recv()`. Thread A holds partition(s_a)'s
    //      `wave_owner` for the wave's duration.
    //   2. Thread B emits on `s_b` — a state node in a DISJOINT partition
    //      (no dep edges connect s_a/s_b, so union-find keeps them apart).
    //   3. Test thread observes Thread B FINISHES quickly (< 200ms),
    //      proving the per-partition mutexes don't block Thread B on
    //      Thread A's held lock.
    //   4. Test thread releases Thread A's fn via `tx.send(())`.
    //   5. Thread A's wave completes; both threads join cleanly.
    //
    // Same-partition serialization is verified by the companion test
    // `concurrent_emit_on_same_partition_serializes` below.
    let rt = TestRuntime::new();
    // Sentinel s_a so subscribe_recorder doesn't trigger an activation-
    // time fire that blocks before thread-A even spawns.
    let s_a = rt.state(None);
    let s_b = rt.state(Some(TestValue::Int(0)));

    let (tx, rx) = std::sync::mpsc::channel::<()>();
    let rx = Arc::new(Mutex::new(Some(rx)));
    let rx_for_fn = rx.clone();
    let fn_entered = Arc::new(AtomicU64::new(0));
    let fn_entered_for_fn = fn_entered.clone();

    let _d = rt.derived(&[s_a.id], move |deps| {
        fn_entered_for_fn.fetch_add(1, Ordering::SeqCst);
        // Block until the test thread releases. The lock-protected Option
        // lets us take() the receiver safely once.
        let recv = rx_for_fn.lock().unwrap().take();
        if let Some(rx) = recv {
            let _ = rx.recv();
        }
        Some(deps[0].clone())
    });

    let _rec_d = rt.subscribe_recorder(_d);

    // Sanity: partition(s_a) and partition(s_b) are disjoint by construction
    // (no dep edges between them). The Y1 parallelism premise depends on
    // this — pin it so a future regression that accidentally unions the
    // partitions doesn't silently turn this into a same-partition race.
    let p_a = rt.core.partition_of(s_a.id).expect("registered");
    let p_b = rt.core.partition_of(s_b.id).expect("registered");
    assert_ne!(
        p_a, p_b,
        "s_a and s_b must start in disjoint partitions for the parallelism win"
    );

    // Thread A: emit on s_a triggers the first fn fire (which blocks on rx).
    // Thread A holds `partition(s_a).wave_owner` for the wave's duration.
    let core_a = rt.core.clone();
    let s_a_id = s_a.id;
    let binding_a = rt.binding.clone();
    let thread_a = thread::spawn(move || {
        let h = binding_a.intern(TestValue::Int(1));
        core_a.emit(s_a_id, h);
    });

    // Wait for thread A's fn to enter (so partition(s_a)'s wave_owner is held).
    let mut waited_ms = 0u64;
    while fn_entered.load(Ordering::SeqCst) == 0 {
        thread::sleep(Duration::from_millis(1));
        waited_ms += 1;
        assert!(
            waited_ms < 5_000,
            "thread A's fn never entered — emit may have deadlocked"
        );
    }

    // Thread B: emits on s_b. Under Y1 / Phase E, this acquires only
    // partition(s_b)'s `wave_owner` — DISJOINT from partition(s_a)'s
    // held by Thread A. EXPECTED to FINISH BEFORE tx.send (the
    // unblock for thread A's fn). Atomic flag enables deterministic
    // event-ordering check (QA-fix group 2 — the earlier 2s timing
    // window was CI-flaky under thread-spawn contention).
    let thread_b_done = Arc::new(AtomicU64::new(0));
    let thread_b_done_for_thread = thread_b_done.clone();
    let core_b = rt.core.clone();
    let s_b_id = s_b.id;
    let binding_b = rt.binding.clone();
    let thread_b = thread::spawn(move || {
        let h = binding_b.intern(TestValue::Int(2));
        core_b.emit(s_b_id, h);
        thread_b_done_for_thread.store(1, Ordering::SeqCst);
    });

    // Wait for Thread B's emit to complete. Generous safety bound (10s)
    // tolerates very slow CI without false-failing parallelism. If the
    // wave engine regressed to whole-Core serialization, Thread B
    // would block until tx.send, but tx.send hasn't been called yet —
    // so a regression manifests as the timeout firing here, not as a
    // false-positive ordering.
    let parallelism_window_start = std::time::Instant::now();
    while thread_b_done.load(Ordering::SeqCst) == 0 {
        assert!(
            parallelism_window_start.elapsed() < Duration::from_secs(10),
            "Thread B's cross-thread emit on a DISJOINT partition should \
             have finished BEFORE tx.send (truly parallel under per-partition \
             wave_owner). Instead it blocked — Y1 parallelism regression?"
        );
        thread::sleep(Duration::from_millis(5));
    }
    // Deterministic event-ordering assertion: Thread B's emit completed
    // BEFORE we release Thread A's fn. Under whole-Core serialization,
    // this is impossible (Thread B would be blocked in `wave_owner`
    // acquisition); under per-partition, it's expected.
    assert_eq!(
        thread_b_done.load(Ordering::SeqCst),
        1,
        "Thread B must have completed its emit before tx.send releases Thread A"
    );

    // Release Thread A's fn — its wave can now drain + finish.
    tx.send(()).expect("send to unblock fn");

    // Both threads should join cleanly.
    let join_with_timeout = |handle: thread::JoinHandle<()>, secs: u64| {
        let start = std::time::Instant::now();
        loop {
            if handle.is_finished() {
                handle.join().expect("thread panicked");
                return;
            }
            if start.elapsed().as_secs() > secs {
                panic!("thread did not finish within {secs}s — likely deadlock");
            }
            thread::sleep(Duration::from_millis(5));
        }
    };
    join_with_timeout(thread_a, 5);
    join_with_timeout(thread_b, 5);
}

#[test]
fn concurrent_emit_on_same_partition_serializes() {
    // Slice Y1 / Phase E (D3, 2026-05-08) — companion to
    // `concurrent_emit_on_disjoint_partitions_runs_truly_parallel`.
    //
    // Same-partition emits MUST still serialize on the partition's
    // `wave_owner`. Without this, a wave's lock-released drain could
    // let a concurrent same-partition emit absorb into the in-flight
    // wave's `pending_notify` and return before subscribers fire —
    // breaking the user-facing "emit returning means subscribers have
    // observed" contract.
    //
    // Test shape: same as the disjoint-partition test, but `s_a` and
    // `s_b` share a partition (`s_b` depends on `s_a` via `_d_join`).
    // Thread B's emit MUST block on partition's wave_owner until
    // Thread A's wave completes.
    let rt = TestRuntime::new();
    let s_a = rt.state(None);
    let s_b = rt.state(Some(TestValue::Int(0)));
    // _d_join unions partition(s_a) and partition(s_b) via dep edges.
    let _d_join = rt.derived(&[s_a.id, s_b.id], |_deps| Some(TestValue::Int(0)));

    let (tx, rx) = std::sync::mpsc::channel::<()>();
    let rx = Arc::new(Mutex::new(Some(rx)));
    let rx_for_fn = rx.clone();
    let fn_entered = Arc::new(AtomicU64::new(0));
    let fn_entered_for_fn = fn_entered.clone();

    let blocking_d = rt.derived(&[s_a.id], move |deps| {
        fn_entered_for_fn.fetch_add(1, Ordering::SeqCst);
        let recv = rx_for_fn.lock().unwrap().take();
        if let Some(rx) = recv {
            let _ = rx.recv();
        }
        Some(deps[0].clone())
    });
    let _rec_d = rt.subscribe_recorder(blocking_d);

    // Sanity: s_a and s_b share a partition (unioned via _d_join).
    let p_a = rt.core.partition_of(s_a.id).expect("registered");
    let p_b = rt.core.partition_of(s_b.id).expect("registered");
    assert_eq!(
        p_a, p_b,
        "s_a and s_b must share a partition for the serialization assertion"
    );

    let core_a = rt.core.clone();
    let s_a_id = s_a.id;
    let binding_a = rt.binding.clone();
    let thread_a = thread::spawn(move || {
        let h = binding_a.intern(TestValue::Int(1));
        core_a.emit(s_a_id, h);
    });

    let mut waited_ms = 0u64;
    while fn_entered.load(Ordering::SeqCst) == 0 {
        thread::sleep(Duration::from_millis(1));
        waited_ms += 1;
        assert!(waited_ms < 5_000, "thread A's fn never entered");
    }

    // Thread B uses entry/exit atomic flags so we can deterministically
    // distinguish "B started but is blocked" from "B never started"
    // (QA-fix group 2 — the earlier 100ms sleep + `is_finished()` check
    // was vacuous on slow CI: if B hadn't been scheduled yet, the
    // assertion `!is_finished()` would pass for the wrong reason).
    let thread_b_entered = Arc::new(AtomicU64::new(0));
    let thread_b_exited = Arc::new(AtomicU64::new(0));
    let entered_for_b = thread_b_entered.clone();
    let exited_for_b = thread_b_exited.clone();
    let core_b = rt.core.clone();
    let s_b_id = s_b.id;
    let binding_b = rt.binding.clone();
    let thread_b = thread::spawn(move || {
        let h = binding_b.intern(TestValue::Int(2));
        entered_for_b.store(1, Ordering::SeqCst);
        core_b.emit(s_b_id, h);
        exited_for_b.store(1, Ordering::SeqCst);
    });

    // Wait for Thread B to ENTER (so we know it's actually attempting
    // the emit, not just unscheduled).
    let mut waited_ms = 0u64;
    while thread_b_entered.load(Ordering::SeqCst) == 0 {
        thread::sleep(Duration::from_millis(1));
        waited_ms += 1;
        assert!(waited_ms < 5_000, "Thread B never started");
    }

    // Now that Thread B is in `core.emit(s_b, h)` and Thread A holds
    // partition(s_a)'s wave_owner (same partition by union via
    // _d_join), Thread B SHOULD block on the partition's wave_owner.
    // Give it 100ms wall-clock to be sure the emit attempt has been
    // made + blocked, then assert exit flag is still 0.
    thread::sleep(Duration::from_millis(100));
    assert_eq!(
        thread_b_exited.load(Ordering::SeqCst),
        0,
        "Thread B entered the emit ({}ms ago) but its emit must block \
         on the held partition wave_owner; the exit flag set early \
         would mean same-partition emits raced through.",
        100
    );

    tx.send(()).expect("send to unblock fn");

    let join_with_timeout = |handle: thread::JoinHandle<()>, secs: u64| {
        let start = std::time::Instant::now();
        loop {
            if handle.is_finished() {
                handle.join().expect("thread panicked");
                return;
            }
            if start.elapsed().as_secs() > secs {
                panic!("thread did not finish within {secs}s");
            }
            thread::sleep(Duration::from_millis(5));
        }
    };
    join_with_timeout(thread_a, 5);
    join_with_timeout(thread_b, 5);
}

// ---------------------------------------------------------------------------
// Refcount discipline preserved across the lock-released refactor
// ---------------------------------------------------------------------------

#[test]
fn lock_released_refactor_does_not_leak_handles_under_basic_emit() {
    // Smoke test that the per-iteration lock-acquisition pattern in
    // drain_and_flush + the sinks-snapshot fix in queue_notify don't
    // corrupt refcount discipline.
    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(0)));
    let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));

    let _rec_d = rt.subscribe_recorder(d);
    for i in 1..=10 {
        s.set(TestValue::Int(i));
    }
    drop(_rec_d);
    drop(s);

    // After dropping subscribers + state, only the derived's cache holds
    // a handle. We can't easily count from outside without exposing
    // internals, but we can verify the binding's live count is bounded.
    let live_now = rt.binding.live_handles();
    assert!(
        live_now <= 2,
        "expected <= 2 live handles after drop (derived cache + maybe a transient), got {}",
        live_now
    );

    // Drop the runtime — final Drop for CoreState should release every
    // remaining retained handle.
    drop(rt);
}

// ---------------------------------------------------------------------------
// Direct binding hooks used by the dispatcher's lock-released paths
// ---------------------------------------------------------------------------

/// Sentinel binding: a BindingBoundary whose invoke_fn re-acquires the
/// Core's state lock via Core::cache_of mid-fire. Used to verify the
/// lock-released-around-invoke_fn discipline directly (without going
/// through TestRuntime's Sink wrapping).
struct ReentrantBinding {
    inner: Arc<TestBinding>,
    core_slot: Mutex<Option<Core>>,
    probe_node: Mutex<Option<NodeId>>,
}

impl ReentrantBinding {
    fn new() -> Arc<Self> {
        Arc::new(Self {
            inner: TestBinding::new(),
            core_slot: Mutex::new(None),
            probe_node: Mutex::new(None),
        })
    }
}

impl BindingBoundary for ReentrantBinding {
    fn invoke_fn(&self, node_id: NodeId, fn_id: FnId, dep_data: &[DepBatch]) -> FnResult {
        // Re-enter Core::cache_of mid-fire — depends on lock being
        // released around this call.
        if let (Some(core), Some(probe)) = (
            self.core_slot.lock().unwrap().as_ref(),
            *self.probe_node.lock().unwrap(),
        ) {
            let _ = core.cache_of(probe);
        }
        self.inner.invoke_fn(node_id, fn_id, dep_data)
    }

    fn custom_equals(&self, equals_handle: FnId, a: HandleId, b: HandleId) -> bool {
        self.inner.custom_equals(equals_handle, a, b)
    }

    fn release_handle(&self, handle: HandleId) {
        self.inner.release_handle(handle);
    }

    fn retain_handle(&self, handle: HandleId) {
        self.inner.retain_handle(handle);
    }
}

#[test]
fn invoke_fn_can_call_core_cache_of_directly() {
    let binding = ReentrantBinding::new();
    let core = Core::new(binding.clone() as Arc<dyn BindingBoundary>);

    // Wire Core back into the binding.
    *binding.core_slot.lock().unwrap() = Some(core.clone());

    // Set up a state probe + a derived whose fn will re-enter Core.
    let probe_h = binding.inner.intern(TestValue::Int(42));
    let probe_id = core.register_state(probe_h, false).unwrap();
    *binding.probe_node.lock().unwrap() = Some(probe_id);

    let in_h = binding.inner.intern(TestValue::Int(1));
    let in_id = core.register_state(in_h, false).unwrap();
    let fn_id = binding.inner.register_fn(|deps| Some(deps[0].clone()));
    let d = core
        .register_derived(&[in_id], fn_id, EqualsMode::Identity, false)
        .unwrap();

    // Subscribe to drive activation → fn fires → invoke_fn re-enters
    // Core::cache_of(probe_id). If lock-held, deadlock; if lock-released,
    // succeeds and returns the probe's handle.
    let rec_events: Arc<Mutex<Vec<Message>>> = Arc::new(Mutex::new(Vec::new()));
    let rec_events_for_sink = rec_events.clone();
    let sink: Sink = Arc::new(move |msgs: &[Message]| {
        rec_events_for_sink.lock().unwrap().extend_from_slice(msgs);
    });
    let _sub = core.subscribe(d, sink);

    // No deadlock + derived produced its first DATA.
    let events = rec_events.lock().unwrap();
    assert!(events.iter().any(|m| matches!(m, Message::Start)));
    assert!(events.iter().any(|m| matches!(m, Message::Data(_))));
}