1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
//! Subscription lifecycle — S2b/β owner-invoked contract (D225/D241).
//!
//! The core RAII `Subscription` (drop-deregisters, `Weak`-upgrade
//! silent-no-op, `Send + Sync` cross-thread move) is **retired** under
//! the actor model: a parameterless `Drop` cannot reach a relocating
//! owned `Core` (D225). `Core::subscribe` now returns a plain `Copy`
//! `SubscriptionId`; deregistration is the explicit owner-invoked
//! `Core::unsubscribe(node, id)` carrying the exact lock-released
//! lifecycle chain. RAII convenience is a binding-layer concern
//! (binding/embedder holds its Core on its affinity worker and calls
//! `core.unsubscribe` from its wrapper's `Drop`); the `TestRuntime`
//! harness models that (tracks subs, tears down on its own `Drop`).
//! This file verifies the replacement contract.
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;
mod common;
use common::{TestRuntime, TestValue};
/// Subscribe a sink that counts DATA messages. Returns the counter and
/// the `SubscriptionId` (β: a plain `Copy` id, not an RAII handle).
fn data_counter(
rt: &TestRuntime,
node_id: graphrefly_core::NodeId,
) -> (Arc<AtomicU32>, graphrefly_core::SubscriptionId) {
let counter = Arc::new(AtomicU32::new(0));
let counter_clone = counter.clone();
let sink: graphrefly_core::Sink = Arc::new(move |msgs: &[graphrefly_core::Message]| {
for m in msgs {
if matches!(m, graphrefly_core::Message::Data(_)) {
counter_clone.fetch_add(1, Ordering::SeqCst);
}
}
});
let sub = rt.core().subscribe(node_id, sink);
(counter, sub)
}
#[test]
fn unsubscribe_deregisters_sink() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let (counter, sub) = data_counter(&rt, s.id);
// Push-on-subscribe delivered initial DATA (cache=0 at subscribe).
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"push-on-subscribe should deliver one DATA"
);
// Owner-invoked unsubscribe (β/D225) — sink deregistered.
rt.core().unsubscribe(s.id, sub);
s.set(TestValue::Int(1));
assert_eq!(
counter.load(Ordering::SeqCst),
1,
"no DATA after unsubscribe"
);
}
#[test]
fn double_unsubscribe_is_safe() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let (_counter, sub) = data_counter(&rt, s.id);
rt.core().unsubscribe(s.id, sub);
// Idempotent: a second unsubscribe of the same id is a no-op, not
// a panic (monotonic-never-recycled ids; F4 invariant).
rt.core().unsubscribe(s.id, sub);
s.set(TestValue::Int(2));
}
#[test]
fn subscription_id_is_copy_send_sync() {
// β/D225: the handle is now a plain `SubscriptionId` newtype —
// `Copy + Send + Sync` (no `Weak<C>`/RAII). Replaces the retired
// `Subscription: Send + Sync` static assertion.
fn assert_copy<T: Copy>() {}
fn assert_send_sync<T: Send + Sync>() {}
assert_copy::<graphrefly_core::SubscriptionId>();
assert_send_sync::<graphrefly_core::SubscriptionId>();
}
#[test]
fn multiple_subscriptions_to_same_node_are_independent() {
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let (c1, sub1) = data_counter(&rt, s.id);
let (c2, sub2) = data_counter(&rt, s.id);
assert_eq!(c1.load(Ordering::SeqCst), 1);
assert_eq!(c2.load(Ordering::SeqCst), 1);
// Unsubscribing one leaves the other live.
rt.core().unsubscribe(s.id, sub1);
s.set(TestValue::Int(1));
assert_eq!(c1.load(Ordering::SeqCst), 1, "sub1 detached");
assert_eq!(c2.load(Ordering::SeqCst), 2, "sub2 still live");
rt.core().unsubscribe(s.id, sub2);
}
#[test]
fn runtime_drop_tears_down_remaining_subscriptions() {
// β/D241: the binding-layer owner (here `TestRuntime`) holds the
// Core on its thread and unsubscribes tracked subs on its own
// `Drop` — the RAII convenience that replaces core-level RAII.
// Smoke: leaving a sub registered must not panic on runtime drop.
let rt = TestRuntime::new();
let s = rt.state(Some(TestValue::Int(0)));
let _rec = rt.subscribe_recorder(s.id);
s.set(TestValue::Int(1));
drop(rt); // tracked-sub teardown on the owner thread — no panic.
}