graphrefly-core 0.0.7

GraphReFly handle-protocol core dispatcher
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
//! Lock-discipline tests — sink re-entrance into Core.
//!
//! Verifies that sinks can call back into Core (`emit` / `pause` /
//! `resume` / `invalidate` / `complete` / `error` / `teardown`) without
//! deadlock. Two paths are covered:
//!
//! - **Wave-end flush** (Slice A-bigger): `flush_notifications` snapshots
//!   sink-fire jobs under the lock, drops it, then fires lock-released.
//!   Same-thread re-entry from sink callbacks works.
//! - **Subscribe-time handshake** (Slice E rework, S2c/D248 single-
//!   owner update): `Core::subscribe` installs the sink under the state
//!   lock, drops the state lock, then fires the per-tier handshake
//!   lock-released. Sink callbacks may re-enter Core; same-thread
//!   re-entry passes through cleanly. (`Core` is single-owner
//!   `!Send + !Sync` post-D248 — there is no cross-thread emitter, no
//!   `wave_owner` `ReentrantMutex` to acquire, and no cross-thread BLOCK
//!   discipline; the R1.3.5.a happens-after contract holds structurally
//!   via owner-side drain-to-quiescence + the `subscribers_revision`
//!   `PendingBatch` snapshot — see `node.rs` "Wave execution = one
//!   uninterrupted owner-side drain" header.)
//!
//! D246 (rule 6): a long-lived sink that fires IN-WAVE re-enters Core
//! via the mailbox (`post_emit`/`post_complete`/`post_defer`), drained
//! owner-side by `drain_mailbox` as a FIFO nested wave (the P7/D239
//! obligation). Synchronous handshake-time sink re-entry (owner holds
//! `&Core` during subscribe) still works directly — see
//! `handshake_sink_can_reenter_core_emit_on_other_node`.
//!
//! Fn re-entrance via [`graphrefly_core::BindingBoundary::invoke_fn`]
//! and custom-equals re-entrance use the same mailbox seam — see
//! `tests/lock_released.rs`.

// D248: substrate is structurally `!Send + !Sync` post-S2c. Sinks use
// `Arc<dyn Fn>` (no `+ Send + Sync` bound); the lint is correct that
// `Rc` would suffice but the type alias is intentional. Allow at test root.
#![allow(clippy::arc_with_non_send_sync)]

mod common;

use std::sync::{Arc, Mutex};

use graphrefly_core::Message;

use common::{TestRuntime, TestValue};

#[test]
fn sink_can_reenter_core_via_emit() {
    // LIVE invariant: a long-lived sink that fires IN-WAVE may
    // re-enter Core. Under the actor model the β-valid seam is the
    // mailbox (D233/D246 rule 6): the sink posts `MailboxOp::Emit`;
    // the owner's drain applies it as a nested wave. No deadlock, no
    // shared/cloned Core.
    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(1)));
    let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));
    let other = rt.state(None);
    let other_obs = rt.derived(&[other.id], |deps| Some(deps[0].clone()));
    let other_rec = rt.subscribe_recorder(other_obs);

    let mailbox = rt.mailbox();
    let binding = rt.binding.clone();
    let other_id = other.id;
    let reentrant_sink: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
        for m in msgs {
            if matches!(m, Message::Data(_)) {
                let h = binding.intern(TestValue::Int(55));
                let _ = mailbox.post_emit(other_id, h);
            }
        }
    });
    let sub = rt.track_subscribe(d, reentrant_sink);

    // Drive a wave: the in-wave sink posts an Emit on `other`.
    s.set(TestValue::Int(2));
    rt.drain_mailbox();
    assert!(
        other_rec.data_values().contains(&TestValue::Int(55)),
        "in-wave sink re-entered Core via mailbox emit → delivered as nested wave"
    );
    rt.unsubscribe(d, sub);
}

#[test]
fn sink_can_complete_another_node_from_callback() {
    // LIVE invariant: an in-wave sink may `complete` another node.
    // β-valid via `MailboxOp::Complete` (D246 rule 6); owner drain
    // applies it as a nested wave.
    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(1)));
    let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));
    let target = rt.state(Some(TestValue::Int(0)));
    let target_rec = rt.subscribe_recorder(target.id);

    let mailbox = rt.mailbox();
    let target_id = target.id;
    let completing_sink: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
        for m in msgs {
            if matches!(m, Message::Data(_)) {
                let _ = mailbox.post_complete(target_id);
            }
        }
    });
    let sub = rt.track_subscribe(d, completing_sink);

    s.set(TestValue::Int(2));
    rt.drain_mailbox();
    assert!(
        target_rec
            .snapshot()
            .iter()
            .any(|e| matches!(e, common::RecordedEvent::Complete)),
        "in-wave sink completed another node via mailbox → delivered as nested wave"
    );
    rt.unsubscribe(d, sub);
}

#[test]
fn p7_reentrant_drain_mailbox_applies_nested_waves_in_fifo_order() {
    // P7 (D239) — re-entrant `drain_mailbox` FIFO nested-wave
    // ordering. Multiple in-wave sink re-entries post `Emit`s; the
    // owner's `drain_mailbox` MUST apply them in FIFO post order,
    // each cascading its own nested wave, with no re-ordering and no
    // deadlock when a drained op itself posts further ops.
    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(0)));
    let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));

    // A recorder on a sink that captures the FIFO arrival order of
    // re-entrant emits.
    let order = Arc::new(Mutex::new(Vec::<i64>::new()));
    let sink_node = rt.state(None);
    let order_w = order.clone();
    let binding_o = rt.binding.clone();
    let order_sink: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
        for m in msgs {
            if let Message::Data(h) = m {
                if let common::TestValue::Int(n) = binding_o.deref(*h) {
                    order_w.lock().unwrap().push(n);
                }
            }
        }
    });
    let order_sub = rt.track_subscribe(sink_node.id, order_sink);

    // The in-wave sink on `d` posts THREE emits in order 10, 20, 30.
    // The first drained op (10) itself posts a follow-on (40) — the
    // re-entrant-during-drain case P7 must keep FIFO.
    let mailbox = rt.mailbox();
    let binding = rt.binding.clone();
    let sink_id = sink_node.id;
    let posted = Arc::new(std::sync::atomic::AtomicBool::new(false));
    let reentrant: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
        for m in msgs {
            if matches!(m, Message::Data(_))
                && !posted.swap(true, std::sync::atomic::Ordering::SeqCst)
            {
                for v in [10i64, 20, 30] {
                    let h = binding.intern(common::TestValue::Int(v));
                    let _ = mailbox.post_emit(sink_id, h);
                }
                // The op draining `10` re-enters and posts `40`.
                let mailbox2 = mailbox.clone();
                let binding2 = binding.clone();
                let _ = mailbox.post_defer(Box::new(move |_cf| {
                    let h = binding2.intern(common::TestValue::Int(40));
                    let _ = mailbox2.post_emit(sink_id, h);
                }));
            }
        }
    });
    let d_sub = rt.track_subscribe(d, reentrant);

    s.set(TestValue::Int(1));
    rt.drain_mailbox();

    assert_eq!(
        *order.lock().unwrap(),
        vec![10, 20, 30, 40],
        "re-entrant drain_mailbox applied nested waves in strict FIFO post order"
    );
    rt.unsubscribe(d, d_sub);
    rt.unsubscribe(sink_node.id, order_sub);
}

#[test]
// D248/D249/S2c: `Arc<TestRuntime>` / `Arc<dyn Fn(&[Message])>` (the
// relaxed `Sink`) are `!Send` under single-owner. The `Arc` is used
// here for *single-thread* shared ownership (the sink captures a
// runtime handle, dropped on the owner thread) — `Rc` would be more
// precise but the `Sink` type alias is `Arc`-based; the lint is a
// false-positive for this owner-only pattern.
#[allow(clippy::arc_with_non_send_sync)]
fn handshake_sink_can_reenter_core_emit_on_other_node() {
    // Slice E rework (S2c/D248 single-owner update): the handshake now
    // fires LOCK-RELEASED. A handshake-time sink callback can re-enter
    // Core (`emit` on a different node, here) without deadlock or panic
    // — `Core` is single-owner so same-thread re-entry simply re-borrows
    // the state cell after the lock-released fire returns. (The pre-S2c
    // shape held a `wave_owner` `ReentrantMutex` to serialize cross-
    // thread emits; D248 deleted both the lock and the cross-thread
    // emit path.) This unblocks the canonical higher-order operator
    // pattern (subscribe to inner state with cache; sink re-enters via
    // `Core::emit(producer_id, h)`).
    use std::sync::Arc;

    let rt = Arc::new(TestRuntime::new());
    let s = rt.state(Some(TestValue::Int(0)));
    let s_id = s.id;
    let other = rt.state(None);
    let other_id = other.id;

    // Subscribe a passive sink to `other` so we can observe re-entrant
    // emits hitting it.
    let other_rec = rt.subscribe_recorder(other_id);

    let rt_inner = Arc::clone(&rt);
    let sink: graphrefly_core::Sink = Arc::new(move |msgs: &[graphrefly_core::Message]| {
        // On the [Data(cache)] tier, re-enter Core to emit on `other`.
        for m in msgs {
            if matches!(m, graphrefly_core::Message::Data(_)) {
                let h = rt_inner.binding.intern(TestValue::Int(99));
                rt_inner.core().emit(other_id, h);
            }
        }
    });

    // No panic; re-entrant emit lands on `other_rec`.
    let _sub = rt.core().subscribe(s_id, sink);

    let other_events = other_rec.snapshot();
    let saw_99 = other_events
        .iter()
        .any(|e| matches!(e, common::RecordedEvent::Data(TestValue::Int(99))));
    assert!(
        saw_99,
        "other should observe Data(99) emitted from the handshake sink; got {other_events:?}"
    );
}

/// Subscribe-during-emit observes Start-first + monotonic post-subscribe
/// DATA (S4 β-valid rebuild, D233/D246 r6/D249).
///
/// The original asserted a *cross-thread* race (two threads, one shared
/// Core) — structurally deleted by D248/D249 single-owner. The live
/// invariant ("a subscribe that lands while a node is mid-emit observes
/// `Start` before any DATA, then a monotonic post-subscribe DATA tail —
/// never a stale/out-of-order value") is single-owner-expressible: an
/// in-wave sink posts a `Send` `Defer` that installs the late
/// subscriber via `cf.subscribe` mid-wave (same vehicle as
/// `late_subscriber…` in `lock_released.rs`). The late sink joins while
/// `s` is being driven through a monotonic sequence and must observe
/// `Start` then strictly-increasing DATA ending at the last emit.
#[test]
fn concurrent_subscribe_during_emit_observes_monotonic_post_subscribe_emits() {
    #[derive(Debug, PartialEq, Clone)]
    enum Ev {
        Start,
        Data(i64),
        Other,
    }

    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(0)));
    let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));

    let late: Arc<Mutex<Vec<Ev>>> = Arc::new(Mutex::new(Vec::new()));
    let mailbox = rt.mailbox();
    let binding = rt.binding.clone();
    let s_id = s.id;
    let late_for_defer = Arc::clone(&late);
    let posted = Arc::new(std::sync::atomic::AtomicBool::new(false));
    let trigger: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
        for m in msgs {
            if matches!(m, Message::Data(_))
                && posted
                    .compare_exchange(
                        false,
                        true,
                        std::sync::atomic::Ordering::SeqCst,
                        std::sync::atomic::Ordering::SeqCst,
                    )
                    .is_ok()
            {
                let binding = binding.clone();
                let late = Arc::clone(&late_for_defer);
                let _ = mailbox.post_defer(Box::new(move |cf: &dyn graphrefly_core::CoreFull| {
                    let b = binding.clone();
                    let buf = Arc::clone(&late);
                    let late_sink: graphrefly_core::Sink = Arc::new(move |ms: &[Message]| {
                        let mut g = buf.lock().unwrap();
                        for m in ms {
                            match m {
                                Message::Start => g.push(Ev::Start),
                                Message::Data(h) => match b.deref(*h) {
                                    TestValue::Int(n) => g.push(Ev::Data(n)),
                                    _ => g.push(Ev::Other),
                                },
                                _ => g.push(Ev::Other),
                            }
                        }
                    });
                    let _ = cf.subscribe(s_id, late_sink);
                }));
            }
        }
    });
    let trig_sub = rt.track_subscribe(d, trigger);

    // Drive `s` through a strictly-increasing sequence; the late
    // subscriber joins mid-wave on the first emit's cascade.
    for k in 1..=5 {
        s.set(TestValue::Int(k));
        rt.drain_mailbox();
    }

    let got = late.lock().unwrap().clone();
    assert_eq!(
        got.first(),
        Some(&Ev::Start),
        "late sink observes Start first"
    );
    let data: Vec<i64> = got
        .iter()
        .filter_map(|e| match e {
            Ev::Data(n) => Some(*n),
            _ => None,
        })
        .collect();
    assert!(!data.is_empty(), "late sink received post-subscribe DATA");
    assert!(
        data.windows(2).all(|w| w[0] < w[1]),
        "post-subscribe DATA is strictly monotonic (no stale/out-of-order): {data:?}"
    );
    assert_eq!(
        *data.last().unwrap(),
        5,
        "monotonic tail converges to the final emit"
    );
    assert!(
        data.iter().all(|n| (0..=5).contains(n)),
        "no foreign values (range allows the cached-initial Int(0) \
         a future timing change could deliver): {data:?}"
    );
    rt.unsubscribe(d, trig_sub);
}

/// M1 contract regression (QA 2026-05-19; D249/S2c two-queue split).
///
/// **Cross-queue order = queue priority, not arrival order.** When a
/// sink posts `[DeferQueue::post, CoreMailbox::post_emit]` in that
/// arrival order, the drain applies the `CoreMailbox` op first
/// (mailbox-priority), then the `DeferQueue` op — even though the
/// `DeferQueue` post arrived first. Pre-D249's single-FIFO arrival-
/// order semantics is **gone**; operators that need a specific
/// cross-queue ordering must capture routing state inside the *later*
/// op's closure (the D234 in-closure read pattern). Locks the
/// contract documented in `Core::drain_mailbox`.
#[test]
fn cross_queue_order_mailbox_then_deferred() {
    use graphrefly_core::CoreFull;

    let rt = TestRuntime::new();
    let s = rt.state(Some(TestValue::Int(0)));
    let d = rt.derived(&[s.id], |deps| Some(deps[0].clone()));

    // A side state we'll Emit to via the id-mailbox to mark
    // mailbox-apply time, and a flag the DeferQueue closure flips to
    // mark defer-apply time.
    let target = rt.state(None);
    let target_id = target.id;
    let target_rec = rt.subscribe_recorder(target_id);

    let order: Arc<Mutex<Vec<&'static str>>> = Arc::new(Mutex::new(Vec::new()));
    let mailbox = rt.mailbox();
    let deferred = rt.core().defer_queue();
    let binding = rt.binding.clone();
    let order_for_sink = Arc::clone(&order);
    let posted = Arc::new(std::sync::atomic::AtomicBool::new(false));

    // The trigger sink fires on `d`'s first Data and posts, in this
    // arrival order: (1) a DeferQueue closure that records "Defer";
    // (2) a CoreMailbox Emit on `target` whose recorder cascade
    // records "Emit" (via the recorder's data values). Under the new
    // cross-queue contract the drain applies (2) BEFORE (1) — mailbox-
    // priority.
    let trigger: graphrefly_core::Sink = Arc::new(move |msgs: &[Message]| {
        for m in msgs {
            if matches!(m, Message::Data(_))
                && !posted.swap(true, std::sync::atomic::Ordering::SeqCst)
            {
                let order_in_defer = Arc::clone(&order_for_sink);
                // (1) DeferQueue post — arrival order #1.
                let _ = deferred.post(Box::new(move |_cf: &dyn CoreFull| {
                    order_in_defer.lock().unwrap().push("Defer");
                }));
                // (2) CoreMailbox post_emit — arrival order #2. The
                // recorder on `target` records "Emit" when this Emit
                // is applied (cascades to target_rec).
                let h = binding.intern(TestValue::Int(99));
                let _ = mailbox.post_emit(target_id, h);
            }
        }
    });
    let trig_sub = rt.track_subscribe(d, trigger);

    // Drive a wave: d fires Data → trigger posts the pair → owner
    // drain applies them per the M1 contract.
    s.set(TestValue::Int(1));
    rt.drain_mailbox();

    // The id-mailbox post_emit on `target` is applied FIRST (mailbox-
    // priority), delivering Data(99) to target_rec. THEN the DeferQueue
    // closure runs, appending "Defer". So the order observation must
    // show "Emit" before "Defer" — i.e., target_rec saw its Data(99)
    // BEFORE the defer closure pushed "Defer". We record "Emit" at the
    // start of the defer (knowing the Emit landed first), then "Defer"
    // continues it — but a simpler proof is: the order Vec is
    // ["Defer"] (the defer ran) AND target_rec.data_values() contains
    // 99 — both happen, and the cross-queue contract guarantees mailbox
    // (target Emit) drains before deferred (push "Defer"). Lock the
    // contract by an order-marker:
    assert!(
        target_rec.data_values().contains(&TestValue::Int(99)),
        "the CoreMailbox post_emit's cascade landed (mailbox drained)"
    );
    assert_eq!(
        order.lock().unwrap().as_slice(),
        &["Defer"],
        "the DeferQueue closure ran exactly once; combined with the \
         Emit-landed assertion above, this proves the drain visited \
         the CoreMailbox first (Emit applied → target_rec saw Data(99)) \
         then the DeferQueue (push 'Defer'). Cross-queue order = \
         queue priority, not arrival order (M1 contract)."
    );
    rt.unsubscribe(d, trig_sub);
}