liminal/channel/subscription.rs
1//! LIM-002 R2/R3: subscriptions backed by real beamr processes.
2//!
3//! Each subscription owns a real, scheduler-supervised beamr native process
4//! (a [`SubscriberProcess`]) plus the in-memory inbox the channel actor delivers
5//! matching envelopes into. The channel actor LINKS to this process's pid on
6//! `Subscribe`; when the [`SubscriptionHandle`] is dropped (or the caller
7//! unsubscribes) the process is terminated, the link fires an `{EXIT, pid, _}`
8//! signal, and the trapping channel actor removes the dead subscriber from its
9//! fan-out list. There is NO weak-Arc polling: liveness is observed structurally
10//! through the beamr link/EXIT path, exactly as the conversation actor observes
11//! its participants (`conversation/actor/beam.rs`).
12//!
13//! R3 predicates live INSIDE the channel actor process: a [`SubscriptionPredicate`]
14//! is a boxed `Fn(&Envelope) -> bool` owned by the actor's subscriber
15//! registration and evaluated at delivery time. This mirrors the participant
16//! `behaviour` pattern (a boxed trait object the process owns); for an in-memory
17//! ephemeral channel there is no need for a serialisable predicate, so a closure
18//! the actor holds is the simplest faithful design.
19
20use std::collections::VecDeque;
21use std::sync::{Arc, Mutex};
22
23use beamr::native::native_process::{NativeContext, NativeHandler, NativeOutcome};
24use beamr::process::ExitReason;
25use beamr::scheduler::Scheduler;
26use beamr::term::binary_ref::BinaryRef;
27
28use crate::channel::wire::decode_envelope;
29use crate::envelope::Envelope;
30use crate::error::LiminalError;
31
32/// In-memory inbox a subscriber receives delivered envelopes on.
33pub(crate) type SubscriberInbox = Arc<Mutex<VecDeque<Envelope>>>;
34
35/// A delivery predicate evaluated by the channel actor against each published
36/// envelope. `None` (no predicate) means deliver everything.
37pub(crate) type SubscriptionPredicate = Arc<dyn Fn(&Envelope) -> bool + Send + Sync>;
38
39/// Real beamr native process backing one subscription.
40///
41/// For LOCAL delivery it is an idle handler (mirroring
42/// `aion::worker::link::IdleWorkerProcess`): local envelopes travel through the
43/// shared [`SubscriberInbox`] the channel actor writes and
44/// [`SubscriptionHandle::try_next`] reads. Its other job is to BE a first-class
45/// linkable, killable process whose lifetime equals the subscription's, so the
46/// channel actor detects the subscription dying via a real EXIT signal rather
47/// than by polling a weak pointer.
48///
49/// For CROSS-NODE delivery (SRV-005) it is also the landing point for a remote
50/// publish: a remote node sends a published envelope, encoded by
51/// [`crate::channel::wire::encode_envelope`], as a single beamr binary directly
52/// to this process's pid (the pid the cluster registered in the channel's
53/// distributed process group). The binary lands in this process's mailbox; the
54/// handler decodes it back into an [`Envelope`] and pushes it onto the SAME
55/// inbox a local publish would, so a subscriber observes local and remote
56/// messages identically. Non-binary wakeups (trapped `{EXIT, _, _}` signals) are
57/// drained and ignored.
58struct SubscriberProcess {
59 inbox: SubscriberInbox,
60}
61
62impl NativeHandler for SubscriberProcess {
63 fn handle(&mut self, ctx: &mut NativeContext<'_>) -> NativeOutcome {
64 // Trapping is set authoritatively at spawn (see `SubscriptionHandle::spawn`)
65 // so it holds before the actor ever links — re-assert it here defensively
66 // for any future restart of this handler.
67 ctx.set_trap_exit(true);
68 // Drain every queued wakeup. A binary message is a remote envelope frame
69 // (SRV-005) to decode and enqueue; everything else (e.g. a trapped
70 // `{EXIT, _, _}` tuple from a crashed actor this subscriber outlives) is
71 // ignored. Death is driven only by an explicit `terminate_process` on
72 // unsubscribe/handle drop.
73 while let Some(message) = ctx.recv() {
74 if let Some(binary) = BinaryRef::new(message) {
75 self.accept_remote_frame(binary.as_bytes());
76 }
77 }
78 NativeOutcome::Wait
79 }
80}
81
82impl SubscriberProcess {
83 /// Decode a remote envelope frame and push it onto the inbox. A frame that
84 /// fails to decode is dropped: a corrupt cross-node payload must never crash
85 /// the subscriber or stall delivery of well-formed messages.
86 fn accept_remote_frame(&self, bytes: &[u8]) {
87 let Ok(envelope) = decode_envelope(bytes) else {
88 return;
89 };
90 if let Ok(mut inbox) = self.inbox.lock() {
91 inbox.push_back(envelope);
92 }
93 }
94}
95
96/// The actor-side record of one subscriber: the inbox to deliver into and the
97/// optional predicate to gate delivery. Held by the channel actor INSIDE its
98/// process, keyed by the subscriber process's pid.
99pub(crate) struct SubscriberRegistration {
100 pid: u64,
101 inbox: SubscriberInbox,
102 predicate: Option<SubscriptionPredicate>,
103}
104
105impl SubscriberRegistration {
106 pub(crate) const fn pid(&self) -> u64 {
107 self.pid
108 }
109
110 /// Delivers `envelope` to this subscriber when its predicate accepts it (or
111 /// it has no predicate). Returns `Ok(true)` when the envelope was pushed onto
112 /// the inbox, `Ok(false)` when a predicate filtered it out, and `Err` only
113 /// when the inbox lock is poisoned. The boolean lets the channel actor count
114 /// genuine deliveries for the delivery-ack signal.
115 pub(crate) fn deliver(&self, envelope: &Envelope) -> Result<bool, LiminalError> {
116 if let Some(predicate) = self.predicate.as_ref() {
117 if !predicate(envelope) {
118 return Ok(false);
119 }
120 }
121 self.inbox
122 .lock()
123 .map_err(|error| LiminalError::DeliveryFailed {
124 message: format!("subscriber inbox unavailable: {error}"),
125 })?
126 .push_back(envelope.clone());
127 Ok(true)
128 }
129}
130
131impl std::fmt::Debug for SubscriberRegistration {
132 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 formatter
134 .debug_struct("SubscriberRegistration")
135 .field("pid", &self.pid)
136 .field("has_predicate", &self.predicate.is_some())
137 .finish_non_exhaustive()
138 }
139}
140
141/// Handle returned by channel subscriptions for receiving validated envelopes.
142///
143/// Owns the subscriber's beamr pid, the shared inbox, and a clone of the
144/// scheduler so the process can be terminated when the subscription ends. The
145/// handle is the subscription's lifetime: dropping the last clone terminates the
146/// subscriber process, whose EXIT prunes the channel actor's fan-out list.
147#[derive(Clone)]
148pub struct SubscriptionHandle {
149 inner: Arc<SubscriptionInner>,
150}
151
152struct SubscriptionInner {
153 pid: u64,
154 inbox: SubscriberInbox,
155 scheduler: Arc<Scheduler>,
156}
157
158impl SubscriptionHandle {
159 /// Spawns a real subscriber process on `scheduler` and returns the handle
160 /// plus its actor-side registration record (carrying any predicate).
161 ///
162 /// # Errors
163 /// Returns [`LiminalError::SubscriptionFailed`] when the scheduler cannot
164 /// spawn the subscriber process.
165 pub(crate) fn spawn(
166 scheduler: &Arc<Scheduler>,
167 predicate: Option<SubscriptionPredicate>,
168 ) -> Result<(Self, SubscriberRegistration), LiminalError> {
169 let inbox: SubscriberInbox = Arc::new(Mutex::new(VecDeque::new()));
170 let process_inbox = Arc::clone(&inbox);
171 let factory = Box::new(move || {
172 Box::new(SubscriberProcess {
173 inbox: Arc::clone(&process_inbox),
174 }) as Box<dyn NativeHandler>
175 });
176 let pid =
177 scheduler
178 .spawn_native(factory)
179 .map_err(|error| LiminalError::SubscriptionFailed {
180 message: format!("failed to spawn subscriber process: {error:?}"),
181 })?;
182 // Set trap_exit BEFORE the channel actor links to this pid, so an
183 // abnormal actor crash is trapped (delivered as a message the subscriber
184 // drains) instead of cascading across the link and killing the
185 // subscriber. This makes the subscriber outlive a channel-actor crash so
186 // the restarted actor can re-link to it on boot (R2/R4). Setting it
187 // host-side (not in the handler's first slice) removes any race against
188 // the crash: the flag is in place the instant `subscribe` proceeds.
189 scheduler
190 .set_trap_exit(pid, true)
191 .map_err(|error| LiminalError::SubscriptionFailed {
192 message: format!("failed to set trap_exit on subscriber process {pid}: {error:?}"),
193 })?;
194 let handle = Self {
195 inner: Arc::new(SubscriptionInner {
196 pid,
197 inbox: Arc::clone(&inbox),
198 scheduler: Arc::clone(scheduler),
199 }),
200 };
201 let registration = SubscriberRegistration {
202 pid,
203 inbox,
204 predicate,
205 };
206 Ok((handle, registration))
207 }
208
209 /// Returns the beamr pid of the subscriber process this handle owns.
210 #[must_use]
211 pub(crate) fn pid(&self) -> u64 {
212 self.inner.pid
213 }
214
215 /// Attempts to receive the next delivered envelope without blocking.
216 ///
217 /// # Errors
218 ///
219 /// Returns [`LiminalError::SubscriptionFailed`] when the subscription inbox cannot be read.
220 pub fn try_next(&self) -> Result<Option<Envelope>, LiminalError> {
221 let mut messages =
222 self.inner
223 .inbox
224 .lock()
225 .map_err(|error| LiminalError::SubscriptionFailed {
226 message: format!("subscription inbox unavailable: {error}"),
227 })?;
228 Ok(messages.pop_front())
229 }
230}
231
232impl std::fmt::Debug for SubscriptionHandle {
233 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 formatter
235 .debug_struct("SubscriptionHandle")
236 .field("pid", &self.inner.pid)
237 .finish_non_exhaustive()
238 }
239}
240
241impl Drop for SubscriptionInner {
242 fn drop(&mut self) {
243 // Terminating the subscriber process fires the bidirectional link to the
244 // channel actor, which traps the EXIT and removes this subscriber from
245 // its fan-out list. This is the real-beamr unsubscribe-on-drop path.
246 self.scheduler
247 .terminate_process(self.pid, ExitReason::Normal);
248 }
249}
250
251/// WR-9b: the REAL [`SubscriberProcess`] running on beamr's cooperative
252/// (single-threaded / wasm) [`beamr::scheduler::WasmScheduler`].
253///
254/// This proves the production subscriber handler — the same `NativeHandler` the
255/// threaded [`SubscriptionHandle::spawn`] spawns — runs unchanged on the
256/// cooperative scheduler that a browser host drives. There is no toy stand-in:
257/// the test spawns the genuine [`SubscriberProcess`], delivers a genuine
258/// [`crate::channel::wire::encode_envelope`] frame as a real beamr binary, pumps
259/// cooperative `run_until_idle` turns, and asserts the envelope is decoded by the
260/// handler's own `accept_remote_frame` path and lands in the shared inbox a
261/// [`SubscriptionHandle::try_next`] would read.
262///
263/// The handler runs cooperatively AS-IS: its `handle` only touches
264/// platform-neutral [`NativeContext`] capabilities (`set_trap_exit`, `recv`),
265/// [`BinaryRef`], and [`decode_envelope`] — none of which reach for threads,
266/// tokio, sockets, or a `SharedState`. The only wiring the smoke supplies is the
267/// cooperative driver (spawn + owned-binary delivery + turn pump), exactly the
268/// host-side seam the threaded `SubscriptionHandle`/channel-actor provide on
269/// native.
270#[cfg(test)]
271#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
272mod cooperative_smoke {
273 use std::cell::RefCell;
274 use std::collections::VecDeque;
275 use std::rc::Rc;
276 use std::sync::{Arc, Mutex};
277
278 use beamr::atom::AtomTable;
279 use beamr::ets::copy_term_to_ets;
280 use beamr::module::ModuleRegistry;
281 use beamr::native::BifRegistryImpl;
282 use beamr::process::heap::Heap;
283 use beamr::scheduler::WasmScheduler;
284 use beamr::term::shared_binary::{SharedBinary, write_proc_bin};
285
286 use super::{SubscriberInbox, SubscriberProcess};
287 use crate::channel::SchemaId;
288 use crate::channel::wire::encode_envelope;
289 use crate::envelope::{Envelope, PublisherId};
290
291 /// Build a cooperative scheduler the way a wasm host holds it (single
292 /// `Rc<RefCell<…>>` on one thread).
293 fn cooperative_scheduler() -> Rc<RefCell<WasmScheduler>> {
294 let atom_table = Arc::new(AtomTable::with_common_atoms());
295 let modules = Arc::new(ModuleRegistry::new());
296 let bifs = Arc::new(BifRegistryImpl::new());
297 Rc::new(RefCell::new(WasmScheduler::new(atom_table, modules, bifs)))
298 }
299
300 /// Encode `envelope` into the production wire frame and wrap it as a
301 /// heap-independent beamr binary term ready for `send_owned`, mirroring how a
302 /// remote node hands a published frame to a subscriber pid (SRV-005).
303 fn frame_as_owned_binary(envelope: &Envelope) -> beamr::ets::OwnedTerm {
304 let bytes = encode_envelope(envelope);
305 let shared = SharedBinary::new(bytes);
306 // A ProcBin reference needs three heap words; copy it into ETS-owned
307 // memory so the scratch heap can be dropped before delivery.
308 let mut scratch = Heap::new(8);
309 let words = scratch
310 .alloc_slice(3)
311 .expect("scratch heap holds a proc-bin reference");
312 let term = write_proc_bin(words, &shared).expect("proc-bin term writes");
313 copy_term_to_ets(term).expect("frame copies into an owned binary")
314 }
315
316 fn sample_envelope() -> Envelope {
317 // A whole-millisecond timestamp so the round-trip through the wire codec
318 // (which carries millisecond resolution, see `channel::wire`) is exact;
319 // `Utc::now()` sub-millisecond precision would otherwise be truncated on
320 // decode and is irrelevant to what this smoke proves.
321 let timestamp = chrono::TimeZone::timestamp_millis_opt(&chrono::Utc, 1_700_000_000_123)
322 .single()
323 .expect("valid fixed millisecond timestamp");
324 Envelope::with_timestamp(
325 b"{\"value\":42}".to_vec(),
326 None,
327 SchemaId::new(),
328 PublisherId::from("publisher-cooperative"),
329 timestamp,
330 )
331 }
332
333 #[test]
334 fn real_subscriber_process_delivers_a_published_envelope_cooperatively() {
335 let scheduler = cooperative_scheduler();
336
337 // The shared inbox the subscriber pushes decoded envelopes onto — the
338 // exact channel the threaded `SubscriptionHandle::try_next` reads.
339 let inbox: SubscriberInbox = Arc::new(Mutex::new(VecDeque::new()));
340 let process_inbox = Arc::clone(&inbox);
341
342 // Spawn the GENUINE production subscriber handler as a first-class native
343 // process on the cooperative scheduler.
344 let pid = scheduler.borrow_mut().spawn_native_root(Box::new(move || {
345 Box::new(SubscriberProcess {
346 inbox: Arc::clone(&process_inbox),
347 }) as Box<dyn beamr::native::native_process::NativeHandler>
348 }));
349
350 // First turn: the handler runs once, asserts trap_exit, finds an empty
351 // mailbox, and parks (`Wait`). No envelope has been delivered yet.
352 scheduler.borrow_mut().run_until_idle();
353 assert!(
354 inbox.lock().expect("inbox lock").is_empty(),
355 "no envelope is delivered before one is published"
356 );
357
358 // Publish: deliver a real encoded frame as a beamr binary straight to the
359 // subscriber pid, exactly as a remote publish lands (SRV-005). This wakes
360 // the parked process.
361 let published = sample_envelope();
362 let frame = frame_as_owned_binary(&published);
363 scheduler
364 .borrow_mut()
365 .send_owned(pid, &frame)
366 .expect("frame is delivered to the live subscriber pid");
367
368 // Pump turns: the woken handler drains the binary, decodes it through its
369 // own `accept_remote_frame` path, and pushes the envelope onto the inbox.
370 let mut delivered = None;
371 for _ in 0..8 {
372 scheduler.borrow_mut().run_until_idle();
373 // Pop in a scoped statement so the mutex guard is released before the
374 // `if let` body (no significant-drop guard held across the scrutinee).
375 let next = inbox.lock().expect("inbox lock").pop_front();
376 if let Some(envelope) = next {
377 delivered = Some(envelope);
378 break;
379 }
380 }
381
382 assert_eq!(
383 delivered.as_ref(),
384 Some(&published),
385 "the real subscriber decoded and delivered the published envelope"
386 );
387 }
388}