Skip to main content

liminal/channel/
subscription.rs

1//! LIM-002 R2/R3: subscriptions backed by real beamr processes.
2//!
3//! Each subscription owns a real, scheduler-supervised beamr native process
4//! (a [`SubscriberProcess`]) plus the in-memory inbox the channel actor delivers
5//! matching envelopes into. The channel actor LINKS to this process's pid on
6//! `Subscribe`; when the [`SubscriptionHandle`] is dropped (or the caller
7//! unsubscribes) the process is terminated, the link fires an `{EXIT, pid, _}`
8//! signal, and the trapping channel actor removes the dead subscriber from its
9//! fan-out list. There is NO weak-Arc polling: liveness is observed structurally
10//! through the beamr link/EXIT path, exactly as the conversation actor observes
11//! its participants (`conversation/actor/beam.rs`).
12//!
13//! R3 predicates live INSIDE the channel actor process: a [`SubscriptionPredicate`]
14//! is a boxed `Fn(&Envelope) -> bool` owned by the actor's subscriber
15//! registration and evaluated at delivery time. This mirrors the participant
16//! `behaviour` pattern (a boxed trait object the process owns); for an in-memory
17//! ephemeral channel there is no need for a serialisable predicate, so a closure
18//! the actor holds is the simplest faithful design.
19
20use std::collections::VecDeque;
21use std::sync::{Arc, Mutex};
22
23use beamr::native::native_process::{NativeContext, NativeHandler, NativeOutcome};
24use beamr::process::ExitReason;
25use beamr::scheduler::Scheduler;
26use beamr::term::binary_ref::BinaryRef;
27
28use crate::channel::wire::decode_envelope;
29use crate::envelope::Envelope;
30use crate::error::LiminalError;
31
32/// In-memory inbox a subscriber receives delivered envelopes on.
33pub(crate) type SubscriberInbox = Arc<Mutex<VecDeque<Envelope>>>;
34
35/// A delivery predicate evaluated by the channel actor against each published
36/// envelope. `None` (no predicate) means deliver everything.
37pub(crate) type SubscriptionPredicate = Arc<dyn Fn(&Envelope) -> bool + Send + Sync>;
38
39/// Real beamr native process backing one subscription.
40///
41/// For LOCAL delivery it is an idle handler (mirroring
42/// `aion::worker::link::IdleWorkerProcess`): local envelopes travel through the
43/// shared [`SubscriberInbox`] the channel actor writes and
44/// [`SubscriptionHandle::try_next`] reads. Its other job is to BE a first-class
45/// linkable, killable process whose lifetime equals the subscription's, so the
46/// channel actor detects the subscription dying via a real EXIT signal rather
47/// than by polling a weak pointer.
48///
49/// For CROSS-NODE delivery (SRV-005) it is also the landing point for a remote
50/// publish: a remote node sends a published envelope, encoded by
51/// [`crate::channel::wire::encode_envelope`], as a single beamr binary directly
52/// to this process's pid (the pid the cluster registered in the channel's
53/// distributed process group). The binary lands in this process's mailbox; the
54/// handler decodes it back into an [`Envelope`] and pushes it onto the SAME
55/// inbox a local publish would, so a subscriber observes local and remote
56/// messages identically. Non-binary wakeups (trapped `{EXIT, _, _}` signals) are
57/// drained and ignored.
58struct SubscriberProcess {
59    inbox: SubscriberInbox,
60}
61
62impl NativeHandler for SubscriberProcess {
63    fn handle(&mut self, ctx: &mut NativeContext<'_>) -> NativeOutcome {
64        // Trapping is set authoritatively at spawn (see `SubscriptionHandle::spawn`)
65        // so it holds before the actor ever links — re-assert it here defensively
66        // for any future restart of this handler.
67        ctx.set_trap_exit(true);
68        // Drain every queued wakeup. A binary message is a remote envelope frame
69        // (SRV-005) to decode and enqueue; everything else (e.g. a trapped
70        // `{EXIT, _, _}` tuple from a crashed actor this subscriber outlives) is
71        // ignored. Death is driven only by an explicit `terminate_process` on
72        // unsubscribe/handle drop.
73        while let Some(message) = ctx.recv() {
74            if let Some(binary) = BinaryRef::new(message) {
75                self.accept_remote_frame(binary.as_bytes());
76            }
77        }
78        NativeOutcome::Wait
79    }
80}
81
82impl SubscriberProcess {
83    /// Decode a remote envelope frame and push it onto the inbox. A frame that
84    /// fails to decode is dropped: a corrupt cross-node payload must never crash
85    /// the subscriber or stall delivery of well-formed messages.
86    fn accept_remote_frame(&self, bytes: &[u8]) {
87        let Ok(envelope) = decode_envelope(bytes) else {
88            return;
89        };
90        if let Ok(mut inbox) = self.inbox.lock() {
91            inbox.push_back(envelope);
92        }
93    }
94}
95
96/// The actor-side record of one subscriber: the inbox to deliver into and the
97/// optional predicate to gate delivery. Held by the channel actor INSIDE its
98/// process, keyed by the subscriber process's pid.
99pub(crate) struct SubscriberRegistration {
100    pid: u64,
101    inbox: SubscriberInbox,
102    predicate: Option<SubscriptionPredicate>,
103}
104
105impl SubscriberRegistration {
106    pub(crate) const fn pid(&self) -> u64 {
107        self.pid
108    }
109
110    /// Delivers `envelope` to this subscriber when its predicate accepts it (or
111    /// it has no predicate). Returns `Ok(true)` when the envelope was pushed onto
112    /// the inbox, `Ok(false)` when a predicate filtered it out, and `Err` only
113    /// when the inbox lock is poisoned. The boolean lets the channel actor count
114    /// genuine deliveries for the delivery-ack signal.
115    pub(crate) fn deliver(&self, envelope: &Envelope) -> Result<bool, LiminalError> {
116        if let Some(predicate) = self.predicate.as_ref() {
117            if !predicate(envelope) {
118                return Ok(false);
119            }
120        }
121        self.inbox
122            .lock()
123            .map_err(|error| LiminalError::DeliveryFailed {
124                message: format!("subscriber inbox unavailable: {error}"),
125            })?
126            .push_back(envelope.clone());
127        Ok(true)
128    }
129}
130
131impl std::fmt::Debug for SubscriberRegistration {
132    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        formatter
134            .debug_struct("SubscriberRegistration")
135            .field("pid", &self.pid)
136            .field("has_predicate", &self.predicate.is_some())
137            .finish_non_exhaustive()
138    }
139}
140
141/// Handle returned by channel subscriptions for receiving validated envelopes.
142///
143/// Owns the subscriber's beamr pid, the shared inbox, and a clone of the
144/// scheduler so the process can be terminated when the subscription ends. The
145/// handle is the subscription's lifetime: dropping the last clone terminates the
146/// subscriber process, whose EXIT prunes the channel actor's fan-out list.
147#[derive(Clone)]
148pub struct SubscriptionHandle {
149    inner: Arc<SubscriptionInner>,
150}
151
152struct SubscriptionInner {
153    pid: u64,
154    inbox: SubscriberInbox,
155    scheduler: Arc<Scheduler>,
156}
157
158impl SubscriptionHandle {
159    /// Spawns a real subscriber process on `scheduler` and returns the handle
160    /// plus its actor-side registration record (carrying any predicate).
161    ///
162    /// # Errors
163    /// Returns [`LiminalError::SubscriptionFailed`] when the scheduler cannot
164    /// spawn the subscriber process.
165    pub(crate) fn spawn(
166        scheduler: &Arc<Scheduler>,
167        predicate: Option<SubscriptionPredicate>,
168    ) -> Result<(Self, SubscriberRegistration), LiminalError> {
169        let inbox: SubscriberInbox = Arc::new(Mutex::new(VecDeque::new()));
170        let process_inbox = Arc::clone(&inbox);
171        let factory = Box::new(move || {
172            Box::new(SubscriberProcess {
173                inbox: Arc::clone(&process_inbox),
174            }) as Box<dyn NativeHandler>
175        });
176        let pid =
177            scheduler
178                .spawn_native(factory)
179                .map_err(|error| LiminalError::SubscriptionFailed {
180                    message: format!("failed to spawn subscriber process: {error:?}"),
181                })?;
182        // Set trap_exit BEFORE the channel actor links to this pid, so an
183        // abnormal actor crash is trapped (delivered as a message the subscriber
184        // drains) instead of cascading across the link and killing the
185        // subscriber. This makes the subscriber outlive a channel-actor crash so
186        // the restarted actor can re-link to it on boot (R2/R4). Setting it
187        // host-side (not in the handler's first slice) removes any race against
188        // the crash: the flag is in place the instant `subscribe` proceeds.
189        scheduler
190            .set_trap_exit(pid, true)
191            .map_err(|error| LiminalError::SubscriptionFailed {
192                message: format!("failed to set trap_exit on subscriber process {pid}: {error:?}"),
193            })?;
194        let handle = Self {
195            inner: Arc::new(SubscriptionInner {
196                pid,
197                inbox: Arc::clone(&inbox),
198                scheduler: Arc::clone(scheduler),
199            }),
200        };
201        let registration = SubscriberRegistration {
202            pid,
203            inbox,
204            predicate,
205        };
206        Ok((handle, registration))
207    }
208
209    /// Returns the beamr pid of the subscriber process this handle owns.
210    #[must_use]
211    pub(crate) fn pid(&self) -> u64 {
212        self.inner.pid
213    }
214
215    /// Attempts to receive the next delivered envelope without blocking.
216    ///
217    /// # Errors
218    ///
219    /// Returns [`LiminalError::SubscriptionFailed`] when the subscription inbox cannot be read.
220    pub fn try_next(&self) -> Result<Option<Envelope>, LiminalError> {
221        let mut messages =
222            self.inner
223                .inbox
224                .lock()
225                .map_err(|error| LiminalError::SubscriptionFailed {
226                    message: format!("subscription inbox unavailable: {error}"),
227                })?;
228        Ok(messages.pop_front())
229    }
230}
231
232impl std::fmt::Debug for SubscriptionHandle {
233    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        formatter
235            .debug_struct("SubscriptionHandle")
236            .field("pid", &self.inner.pid)
237            .finish_non_exhaustive()
238    }
239}
240
241impl Drop for SubscriptionInner {
242    fn drop(&mut self) {
243        // Terminating the subscriber process fires the bidirectional link to the
244        // channel actor, which traps the EXIT and removes this subscriber from
245        // its fan-out list. This is the real-beamr unsubscribe-on-drop path.
246        self.scheduler
247            .terminate_process(self.pid, ExitReason::Normal);
248    }
249}
250
251/// WR-9b: the REAL [`SubscriberProcess`] running on beamr's cooperative
252/// (single-threaded / wasm) [`beamr::scheduler::WasmScheduler`].
253///
254/// This proves the production subscriber handler — the same `NativeHandler` the
255/// threaded [`SubscriptionHandle::spawn`] spawns — runs unchanged on the
256/// cooperative scheduler that a browser host drives. There is no toy stand-in:
257/// the test spawns the genuine [`SubscriberProcess`], delivers a genuine
258/// [`crate::channel::wire::encode_envelope`] frame as a real beamr binary, pumps
259/// cooperative `run_until_idle` turns, and asserts the envelope is decoded by the
260/// handler's own `accept_remote_frame` path and lands in the shared inbox a
261/// [`SubscriptionHandle::try_next`] would read.
262///
263/// The handler runs cooperatively AS-IS: its `handle` only touches
264/// platform-neutral [`NativeContext`] capabilities (`set_trap_exit`, `recv`),
265/// [`BinaryRef`], and [`decode_envelope`] — none of which reach for threads,
266/// tokio, sockets, or a `SharedState`. The only wiring the smoke supplies is the
267/// cooperative driver (spawn + owned-binary delivery + turn pump), exactly the
268/// host-side seam the threaded `SubscriptionHandle`/channel-actor provide on
269/// native.
270#[cfg(test)]
271#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
272mod cooperative_smoke {
273    use std::cell::RefCell;
274    use std::collections::VecDeque;
275    use std::rc::Rc;
276    use std::sync::{Arc, Mutex};
277
278    use beamr::atom::AtomTable;
279    use beamr::ets::copy_term_to_ets;
280    use beamr::module::ModuleRegistry;
281    use beamr::native::BifRegistryImpl;
282    use beamr::process::heap::Heap;
283    use beamr::scheduler::WasmScheduler;
284    use beamr::term::shared_binary::{SharedBinary, write_proc_bin};
285
286    use super::{SubscriberInbox, SubscriberProcess};
287    use crate::channel::SchemaId;
288    use crate::channel::wire::encode_envelope;
289    use crate::envelope::{Envelope, PublisherId};
290
291    /// Build a cooperative scheduler the way a wasm host holds it (single
292    /// `Rc<RefCell<…>>` on one thread).
293    fn cooperative_scheduler() -> Rc<RefCell<WasmScheduler>> {
294        let atom_table = Arc::new(AtomTable::with_common_atoms());
295        let modules = Arc::new(ModuleRegistry::new());
296        let bifs = Arc::new(BifRegistryImpl::new());
297        Rc::new(RefCell::new(WasmScheduler::new(atom_table, modules, bifs)))
298    }
299
300    /// Encode `envelope` into the production wire frame and wrap it as a
301    /// heap-independent beamr binary term ready for `send_owned`, mirroring how a
302    /// remote node hands a published frame to a subscriber pid (SRV-005).
303    fn frame_as_owned_binary(envelope: &Envelope) -> beamr::ets::OwnedTerm {
304        let bytes = encode_envelope(envelope);
305        let shared = SharedBinary::new(bytes);
306        // A ProcBin reference needs three heap words; copy it into ETS-owned
307        // memory so the scratch heap can be dropped before delivery.
308        let mut scratch = Heap::new(8);
309        let words = scratch
310            .alloc_slice(3)
311            .expect("scratch heap holds a proc-bin reference");
312        let term = write_proc_bin(words, &shared).expect("proc-bin term writes");
313        copy_term_to_ets(term).expect("frame copies into an owned binary")
314    }
315
316    fn sample_envelope() -> Envelope {
317        // A whole-millisecond timestamp so the round-trip through the wire codec
318        // (which carries millisecond resolution, see `channel::wire`) is exact;
319        // `Utc::now()` sub-millisecond precision would otherwise be truncated on
320        // decode and is irrelevant to what this smoke proves.
321        let timestamp = chrono::TimeZone::timestamp_millis_opt(&chrono::Utc, 1_700_000_000_123)
322            .single()
323            .expect("valid fixed millisecond timestamp");
324        Envelope::with_timestamp(
325            b"{\"value\":42}".to_vec(),
326            None,
327            SchemaId::new(),
328            PublisherId::from("publisher-cooperative"),
329            timestamp,
330        )
331    }
332
333    #[test]
334    fn real_subscriber_process_delivers_a_published_envelope_cooperatively() {
335        let scheduler = cooperative_scheduler();
336
337        // The shared inbox the subscriber pushes decoded envelopes onto — the
338        // exact channel the threaded `SubscriptionHandle::try_next` reads.
339        let inbox: SubscriberInbox = Arc::new(Mutex::new(VecDeque::new()));
340        let process_inbox = Arc::clone(&inbox);
341
342        // Spawn the GENUINE production subscriber handler as a first-class native
343        // process on the cooperative scheduler.
344        let pid = scheduler.borrow_mut().spawn_native_root(Box::new(move || {
345            Box::new(SubscriberProcess {
346                inbox: Arc::clone(&process_inbox),
347            }) as Box<dyn beamr::native::native_process::NativeHandler>
348        }));
349
350        // First turn: the handler runs once, asserts trap_exit, finds an empty
351        // mailbox, and parks (`Wait`). No envelope has been delivered yet.
352        scheduler.borrow_mut().run_until_idle();
353        assert!(
354            inbox.lock().expect("inbox lock").is_empty(),
355            "no envelope is delivered before one is published"
356        );
357
358        // Publish: deliver a real encoded frame as a beamr binary straight to the
359        // subscriber pid, exactly as a remote publish lands (SRV-005). This wakes
360        // the parked process.
361        let published = sample_envelope();
362        let frame = frame_as_owned_binary(&published);
363        scheduler
364            .borrow_mut()
365            .send_owned(pid, &frame)
366            .expect("frame is delivered to the live subscriber pid");
367
368        // Pump turns: the woken handler drains the binary, decodes it through its
369        // own `accept_remote_frame` path, and pushes the envelope onto the inbox.
370        let mut delivered = None;
371        for _ in 0..8 {
372            scheduler.borrow_mut().run_until_idle();
373            // Pop in a scoped statement so the mutex guard is released before the
374            // `if let` body (no significant-drop guard held across the scrutinee).
375            let next = inbox.lock().expect("inbox lock").pop_front();
376            if let Some(envelope) = next {
377                delivered = Some(envelope);
378                break;
379            }
380        }
381
382        assert_eq!(
383            delivered.as_ref(),
384            Some(&published),
385            "the real subscriber decoded and delivered the published envelope"
386        );
387    }
388}