use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use beamr::native::native_process::{NativeContext, NativeHandler, NativeOutcome};
use beamr::process::ExitReason;
use beamr::scheduler::Scheduler;
use beamr::term::binary_ref::BinaryRef;
use crate::channel::wire::decode_envelope;
use crate::envelope::Envelope;
use crate::error::LiminalError;
pub(crate) type SubscriberInbox = Arc<Mutex<VecDeque<Envelope>>>;
pub(crate) type SubscriptionPredicate = Arc<dyn Fn(&Envelope) -> bool + Send + Sync>;
struct SubscriberProcess {
inbox: SubscriberInbox,
}
impl NativeHandler for SubscriberProcess {
fn handle(&mut self, ctx: &mut NativeContext<'_>) -> NativeOutcome {
ctx.set_trap_exit(true);
while let Some(message) = ctx.recv() {
if let Some(binary) = BinaryRef::new(message) {
self.accept_remote_frame(binary.as_bytes());
}
}
NativeOutcome::Wait
}
}
impl SubscriberProcess {
fn accept_remote_frame(&self, bytes: &[u8]) {
let Ok(envelope) = decode_envelope(bytes) else {
return;
};
if let Ok(mut inbox) = self.inbox.lock() {
inbox.push_back(envelope);
}
}
}
pub(crate) struct SubscriberRegistration {
pid: u64,
inbox: SubscriberInbox,
predicate: Option<SubscriptionPredicate>,
}
impl SubscriberRegistration {
pub(crate) const fn pid(&self) -> u64 {
self.pid
}
pub(crate) fn deliver(&self, envelope: &Envelope) -> Result<bool, LiminalError> {
if let Some(predicate) = self.predicate.as_ref() {
if !predicate(envelope) {
return Ok(false);
}
}
self.inbox
.lock()
.map_err(|error| LiminalError::DeliveryFailed {
message: format!("subscriber inbox unavailable: {error}"),
})?
.push_back(envelope.clone());
Ok(true)
}
}
impl std::fmt::Debug for SubscriberRegistration {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("SubscriberRegistration")
.field("pid", &self.pid)
.field("has_predicate", &self.predicate.is_some())
.finish_non_exhaustive()
}
}
#[derive(Clone)]
pub struct SubscriptionHandle {
inner: Arc<SubscriptionInner>,
}
struct SubscriptionInner {
pid: u64,
inbox: SubscriberInbox,
scheduler: Arc<Scheduler>,
}
impl SubscriptionHandle {
pub(crate) fn spawn(
scheduler: &Arc<Scheduler>,
predicate: Option<SubscriptionPredicate>,
) -> Result<(Self, SubscriberRegistration), LiminalError> {
let inbox: SubscriberInbox = Arc::new(Mutex::new(VecDeque::new()));
let process_inbox = Arc::clone(&inbox);
let factory = Box::new(move || {
Box::new(SubscriberProcess {
inbox: Arc::clone(&process_inbox),
}) as Box<dyn NativeHandler>
});
let pid =
scheduler
.spawn_native(factory)
.map_err(|error| LiminalError::SubscriptionFailed {
message: format!("failed to spawn subscriber process: {error:?}"),
})?;
scheduler
.set_trap_exit(pid, true)
.map_err(|error| LiminalError::SubscriptionFailed {
message: format!("failed to set trap_exit on subscriber process {pid}: {error:?}"),
})?;
let handle = Self {
inner: Arc::new(SubscriptionInner {
pid,
inbox: Arc::clone(&inbox),
scheduler: Arc::clone(scheduler),
}),
};
let registration = SubscriberRegistration {
pid,
inbox,
predicate,
};
Ok((handle, registration))
}
#[must_use]
pub(crate) fn pid(&self) -> u64 {
self.inner.pid
}
pub fn try_next(&self) -> Result<Option<Envelope>, LiminalError> {
let mut messages =
self.inner
.inbox
.lock()
.map_err(|error| LiminalError::SubscriptionFailed {
message: format!("subscription inbox unavailable: {error}"),
})?;
Ok(messages.pop_front())
}
}
impl std::fmt::Debug for SubscriptionHandle {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("SubscriptionHandle")
.field("pid", &self.inner.pid)
.finish_non_exhaustive()
}
}
impl Drop for SubscriptionInner {
fn drop(&mut self) {
self.scheduler
.terminate_process(self.pid, ExitReason::Normal);
}
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod cooperative_smoke {
use std::cell::RefCell;
use std::collections::VecDeque;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use beamr::atom::AtomTable;
use beamr::ets::copy_term_to_ets;
use beamr::module::ModuleRegistry;
use beamr::native::BifRegistryImpl;
use beamr::process::heap::Heap;
use beamr::scheduler::WasmScheduler;
use beamr::term::shared_binary::{SharedBinary, write_proc_bin};
use super::{SubscriberInbox, SubscriberProcess};
use crate::channel::SchemaId;
use crate::channel::wire::encode_envelope;
use crate::envelope::{Envelope, PublisherId};
fn cooperative_scheduler() -> Rc<RefCell<WasmScheduler>> {
let atom_table = Arc::new(AtomTable::with_common_atoms());
let modules = Arc::new(ModuleRegistry::new());
let bifs = Arc::new(BifRegistryImpl::new());
Rc::new(RefCell::new(WasmScheduler::new(atom_table, modules, bifs)))
}
fn frame_as_owned_binary(envelope: &Envelope) -> beamr::ets::OwnedTerm {
let bytes = encode_envelope(envelope);
let shared = SharedBinary::new(bytes);
let mut scratch = Heap::new(8);
let words = scratch
.alloc_slice(3)
.expect("scratch heap holds a proc-bin reference");
let term = write_proc_bin(words, &shared).expect("proc-bin term writes");
copy_term_to_ets(term).expect("frame copies into an owned binary")
}
fn sample_envelope() -> Envelope {
let timestamp = chrono::TimeZone::timestamp_millis_opt(&chrono::Utc, 1_700_000_000_123)
.single()
.expect("valid fixed millisecond timestamp");
Envelope::with_timestamp(
b"{\"value\":42}".to_vec(),
None,
SchemaId::new(),
PublisherId::from("publisher-cooperative"),
timestamp,
)
}
#[test]
fn real_subscriber_process_delivers_a_published_envelope_cooperatively() {
let scheduler = cooperative_scheduler();
let inbox: SubscriberInbox = Arc::new(Mutex::new(VecDeque::new()));
let process_inbox = Arc::clone(&inbox);
let pid = scheduler.borrow_mut().spawn_native_root(Box::new(move || {
Box::new(SubscriberProcess {
inbox: Arc::clone(&process_inbox),
}) as Box<dyn beamr::native::native_process::NativeHandler>
}));
scheduler.borrow_mut().run_until_idle();
assert!(
inbox.lock().expect("inbox lock").is_empty(),
"no envelope is delivered before one is published"
);
let published = sample_envelope();
let frame = frame_as_owned_binary(&published);
scheduler
.borrow_mut()
.send_owned(pid, &frame)
.expect("frame is delivered to the live subscriber pid");
let mut delivered = None;
for _ in 0..8 {
scheduler.borrow_mut().run_until_idle();
let next = inbox.lock().expect("inbox lock").pop_front();
if let Some(envelope) = next {
delivered = Some(envelope);
break;
}
}
assert_eq!(
delivered.as_ref(),
Some(&published),
"the real subscriber decoded and delivered the published envelope"
);
}
}