1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
//! Shared dynamic-link state for every [`super::super::Linker`] clone.
//!
//! Owns [`LinkerState`] behind [`RwLock`], the topology [`TopologyCoordinator`], and the
//! `dl_operation_pending` [`AtomicBool`] used to coordinate [`LinkerShared::synchronize_link_operation`]
//! with followers. Prefer these helpers over raw lock calls — lock ordering and the “never block
//! on `write()` without cooperating” invariant are spelled out on [`super`] (the linker `sync`
//! module).
use std::{
ops::Deref,
sync::{
Arc, Barrier, RwLock, RwLockReadGuard, RwLockWriteGuard,
atomic::{AtomicBool, Ordering},
},
};
use tracing::trace;
use wasmer::{AsStoreMut, FunctionEnv, FunctionEnvMut};
use crate::{WasiEnv, WasiProcess, WasiThreadId};
use super::super::{InstanceGroupState, LinkError, LinkerState};
use super::{
DlOperation, LinkerStateWriteBackoff,
topology_lock::{TopologyCoordinator, TopologyToken},
};
/// Shared linkage and synchronization primitives for every [`super::super::Linker`] handle.
///
/// Cloning is cheap (`Arc`-backed locks and coordinators); clone when an instance-group handle
/// outlives a particular stack frame but must keep talking to the same dynamic-link universe.
#[derive(Clone)]
pub(in crate::state::linker) struct LinkerShared {
/// Global module tables, buses, … — see [`LinkerState`].
linker_state: Arc<RwLock<LinkerState>>,
/// [`TopologyCoordinator`] embedded with this linker — guards topology-changing phases.
///
/// At most **one** active [`TopologyToken`](TopologyToken) may exist cluster-wide while any
/// topology mutation sequence is underway.
topology_coordinator: TopologyCoordinator,
/// Set during [`LinkerShared::synchronize_link_operation`] so syscall paths / cooperative writers
/// can enter [`LinkerShared::do_pending_link_operations_internal`].
dl_operation_pending: Arc<AtomicBool>,
}
impl LinkerShared {
/// Wraps freshly constructed [`LinkerState`] for the owning process/module tree (initially only
/// the main [`super::super::Linker::new`] path).
pub(in crate::state::linker) fn new(linker_state: LinkerState) -> Self {
Self {
linker_state: Arc::new(RwLock::new(linker_state)),
topology_coordinator: TopologyCoordinator::new(),
dl_operation_pending: Arc::new(AtomicBool::new(false)),
}
}
/// Panics unless both DL buses have exactly one receiver — validates main-group bootstrap before
/// exclusive writes (see [`Self::bootstrap_exclusive_write_then`]).
fn assert_exactly_one_dl_bus_subscriber(ls: &LinkerState) {
let op_rx = ls.send_pending_operation.rx_count();
let barrier_rx = ls.send_pending_operation_barrier.rx_count();
if op_rx != 1 || barrier_rx != 1 {
panic!(
"wasix linker bootstrap invariant violated: expected exactly one DL bus subscriber \
on each sender (pending_operation rx={op_rx}, barrier rx={barrier_rx}); \
`LinkerShared::bootstrap_exclusive_write_then` must only run during main \
`Linker::new` finalization before additional instance groups attach receivers"
);
}
}
/// Exclusive [`LinkerState`] write for main linker bootstrap only.
///
/// # Safety
///
/// Must run only while exactly one instance group has subscribed to both DL buses (verified
/// after the lock is taken — mismatch panics in release builds). Caller must respect instance-group /
/// linker lock ordering used in [`super::super::Linker::new`].
pub(in crate::state::linker) unsafe fn bootstrap_exclusive_write_then<R>(
&self,
f: impl FnOnce(&mut LinkerState) -> R,
) -> R {
let mut guard = self.linker_state.write().unwrap();
Self::assert_exactly_one_dl_bus_subscriber(&guard);
f(&mut guard)
}
/// Non-blocking `try_write` on [`LinkerState`].
///
/// Used sparingly where blocking would recurse into the linker (stub paths, best-effort
/// resolution). Prefer [`Self::write_linker_state`] for normal cooperative writes.
pub(in crate::state::linker) fn try_write_linker_state(
&self,
) -> Result<
RwLockWriteGuard<'_, LinkerState>,
std::sync::TryLockError<RwLockWriteGuard<'_, LinkerState>>,
> {
self.linker_state.try_write()
}
/// Non-blocking `try_read` on [`LinkerState`].
pub(in crate::state::linker) fn try_read_linker_state(
&self,
) -> Result<
RwLockReadGuard<'_, LinkerState>,
std::sync::TryLockError<RwLockReadGuard<'_, LinkerState>>,
> {
self.linker_state.try_read()
}
/// Locks [`LinkerState`] for write using repeated `try_write` plus cooperative draining of
/// pending dynamic-link replay and [`LinkerStateWriteBackoff`].
///
/// Prefer this over naked [`RwLock::write`] / blocking `write()` from instance-group linker
/// paths: another OS thread might hold the write lock while follower groups rendezvous at a DL
/// barrier waiting for **this** thread to run [`Self::do_pending_link_operations_internal`].
pub(in crate::state::linker) fn write_linker_state(
&self,
group_state: &mut InstanceGroupState,
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
) -> Result<RwLockWriteGuard<'_, LinkerState>, LinkError> {
let mut linker_write_backoff = LinkerStateWriteBackoff::new();
loop {
match self.linker_state.try_write() {
Ok(guard) => return Ok(guard),
Err(std::sync::TryLockError::WouldBlock) => {
linker_write_backoff.backoff();
let env = ctx.as_ref();
let mut store = ctx.as_store_mut();
self.do_pending_link_operations_internal(group_state, &mut store, &env)?;
}
Err(std::sync::TryLockError::Poisoned(_)) => {
panic!("The linker state's lock is poisoned");
}
}
}
}
/// [`TopologyCoordinator::try_acquire`] loop with [`LinkerStateWriteBackoff`] plus cooperative drains
/// of [`Self::do_pending_link_operations_internal`].
///
/// **Lock ordering**: topology must be leased **before** taking [`LinkerState`] for write paths that
/// change replicated topology (spawn prepare, guarded loads, [`super::super::Linker::resolve_export`],
/// etc.).
///
/// `prepare_for_instance_group` is the motivating case — the parent attaches no new subscribers until
/// the child finalizes while still holding this token handed across threads.
pub(in crate::state::linker) fn acquire_topology_token(
&self,
group_state: &mut InstanceGroupState,
store: &mut impl AsStoreMut,
env: &FunctionEnv<WasiEnv>,
) -> Result<TopologyToken, LinkError> {
let mut backoff = LinkerStateWriteBackoff::new();
loop {
if let Some(t) = self.topology_coordinator.try_acquire() {
return Ok(t);
}
backoff.backoff();
self.do_pending_link_operations_internal(group_state, store, env)?;
}
}
/// Blocking [`RwLock`] write once a [`TopologyToken`] is already held (spawn finalization —
/// e.g. [`super::super::Linker::create_instance_group`]).
///
/// Returns `(token, guard)` — drop the **`guard`** before **`token`** to avoid extending the write
/// critical section beyond topology decisions.
pub(in crate::state::linker) fn write_linker_state_blocking_holding_topology(
&self,
topology: TopologyToken,
) -> (TopologyToken, RwLockWriteGuard<'_, LinkerState>) {
let linker_state_write_guard = self.linker_state.write().unwrap();
(topology, linker_state_write_guard)
}
/// Acquires topology (see [`Self::acquire_topology_token`]), then takes a blocking write lock via
/// [`Self::write_linker_state_blocking_holding_topology`].
///
/// Use this for paths that mutate [`LinkerState`] under the topology coordinator’s single-writer
/// umbrella when the lease was **not** already taken elsewhere.
pub(in crate::state::linker) fn write_linker_state_with_topology(
&self,
group_state: &mut InstanceGroupState,
ctx: &mut FunctionEnvMut<'_, WasiEnv>,
) -> Result<(TopologyToken, RwLockWriteGuard<'_, LinkerState>), LinkError> {
let env = ctx.as_ref();
let mut store = ctx.as_store_mut();
let token = self.acquire_topology_token(group_state, &mut store, &env)?;
Ok(self.write_linker_state_blocking_holding_topology(token))
}
/// Broadcasts [`DlOperation`] `op` to every instance-group receiver then waits for replay.
///
/// Contracts:
///
/// * `topology` must already belong to **this** instigating flow and was leased **before**
/// exclusive access to buses / tables was acquired.
/// * `linker_state_write_lock` guards bus broadcast invariants (`try_broadcast` must succeed).
/// * Recoverable semantic failures are surfaced by callers; panics here are always fatal —
/// bus capacity misuse or missed rendezvous implies we cannot reconcile groups.
/// * Drops `topology` when done (`num_groups <= 1`) or after the follower completion barrier.
pub(in crate::state::linker) fn synchronize_link_operation(
&self,
topology: TopologyToken,
op: DlOperation,
mut linker_state_write_lock: RwLockWriteGuard<'_, LinkerState>,
group_state: &mut InstanceGroupState,
wasi_process: &WasiProcess,
self_thread_id: WasiThreadId,
) {
trace!(?op, "Synchronizing link operation");
let num_groups = linker_state_write_lock.send_pending_operation.rx_count();
if num_groups <= 1 {
trace!("No other living instance groups, nothing to do");
drop(topology);
return;
}
let barrier = Arc::new(Barrier::new(num_groups));
// Single-flight barrier envelope (bus depth is one intentionally).
if linker_state_write_lock
.send_pending_operation_barrier
.try_broadcast(barrier.clone())
.is_err()
{
panic!("Internal error: more than one synchronized link operation active")
}
// Wake followers so syscall paths re-enter cooperative DL helpers promptly.
self.dl_operation_pending.store(true, Ordering::SeqCst);
trace!("Signalling wasix threads to wake up");
for thread in wasi_process
.all_threads()
.into_iter()
.filter(|tid| *tid != self_thread_id)
{
wasi_process.signal_thread(&thread, wasmer_wasix_types::wasi::Signal::Sigwakeup);
}
trace!(%num_groups, "Waiting at barrier");
barrier.wait();
trace!("All threads now processing dl op");
// Everyone saw [`Self::dl_operation_pending_load`] and will drive `recv` paths.
self.dl_operation_pending.store(false, Ordering::SeqCst);
// Still under write lock: publish the replicated command before releasing exclusivity.
if linker_state_write_lock
.send_pending_operation
.try_broadcast(op.clone())
.is_err()
{
panic!("Internal error: more than one synchronized link operation active")
}
// Downgrade to shared read while followers apply (`apply_dl_operation`); no topology writer
// should race between the barrier epochs.
trace!("Unlocking linker state");
drop(linker_state_write_lock);
let linker_state_read_lock = self.linker_state.read().unwrap();
// Drain local bus copies — frees mailbox capacity before the follower epoch completes.
_ = group_state.recv_pending_operation_barrier.recv().unwrap();
_ = group_state.recv_pending_operation.recv().unwrap();
// Second rendezvous guarantees everyone finished before another writer can preempt read-only
// application (see linker `sync` module discussion).
trace!("Waiting for other threads to finish processing the dl op");
barrier.wait();
drop(linker_state_read_lock);
drop(topology);
trace!("Synchronization complete");
}
/// Peek at the cooperative-DL handshake flag `dl_operation_pending` with arbitrary memory
/// ordering.
///
/// Prefer [`Ordering::SeqCst`] (`fast = false` in callers) whenever another thread waking from
/// `Sigwakeup` must reliably observe transitions; relaxed loads are intentionally lossy —
/// safe only when callers will retry promptly on their own syscall boundaries.
pub(in crate::state::linker) fn dl_operation_pending_load(&self, ordering: Ordering) -> bool {
self.dl_operation_pending.load(ordering)
}
/// Follow half of [`Self::synchronize_link_operation`] — participates in barriers, consumes the broadcast
/// [`DlOperation`], and applies `op` to `group_state` under [`LinkerState`] read access.
///
/// Intended for callers that already skipped the idle fast path (cheap load of
/// `dl_operation_pending`) yet still need deterministic rendezvous semantics.
///
/// # Panics
///
/// Missing receivers / malformed bus state panic — those are irrecoverable and indicate we lost
/// synchronization with subscribers.
pub(in crate::state::linker) fn do_pending_link_operations_internal(
&self,
group_state: &mut InstanceGroupState,
store: &mut impl AsStoreMut,
env: &FunctionEnv<WasiEnv>,
) -> Result<(), LinkError> {
if !self.dl_operation_pending.load(Ordering::SeqCst) {
return Ok(());
}
trace!("Pending link operation discovered, will process");
let barrier = group_state.recv_pending_operation_barrier.recv().expect(
"Failed to receive barrier while a DL operation was \
in progress; this condition can't be recovered from",
);
barrier.wait();
trace!("Past the barrier, now processing operation");
// Barrier epoch complete — instigator downgraded writer→reader earlier, so follower reads OK.
let op = group_state.recv_pending_operation.recv().unwrap();
let linker_state = self.linker_state.read().unwrap();
let result = group_state.apply_dl_operation(linker_state.deref(), op, store, env);
trace!("Operation applied, now waiting at second barrier");
// Rendezvous again so nobody leaves while others still mutate stores / tables concurrently.
barrier.wait();
drop(linker_state);
trace!("Pending link operation applied successfully");
result
}
}