use std::collections::HashMap;
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, Weak};
use beamr::atom::Atom;
use beamr::native::native_process::{NativeContext, NativeHandler, NativeOutcome};
use beamr::scheduler::Scheduler;
use super::actor::ActorCore;
use crate::conversation::types::ParticipantPid;
use crate::envelope::Envelope;
use crate::error::LiminalError;
pub trait ParticipantBehaviour: Send + Sync + std::fmt::Debug {
fn process(&self, request: &Envelope) -> Option<Envelope>;
}
#[derive(Clone, Copy, Debug, Default)]
pub struct EchoBehaviour;
impl ParticipantBehaviour for EchoBehaviour {
fn process(&self, request: &Envelope) -> Option<Envelope> {
Some(request.clone())
}
}
#[derive(Debug, Default)]
pub(super) struct ParticipantRuntime {
registrations: Mutex<HashMap<u64, ParticipantRegistration>>,
}
type RequestQueue = Arc<Mutex<VecDeque<Envelope>>>;
type ParticipantSnapshot = (RequestQueue, Arc<dyn ParticipantBehaviour>, Weak<ActorCore>);
struct ParticipantRegistration {
inbox: RequestQueue,
behaviour: Arc<dyn ParticipantBehaviour>,
core: Weak<ActorCore>,
}
impl std::fmt::Debug for ParticipantRegistration {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("ParticipantRegistration")
.field("behaviour", &self.behaviour)
.finish_non_exhaustive()
}
}
impl ParticipantRuntime {
pub(super) fn register(
&self,
pid: ParticipantPid,
inbox: RequestQueue,
behaviour: Arc<dyn ParticipantBehaviour>,
core: Weak<ActorCore>,
) -> Result<(), LiminalError> {
lock(&self.registrations)?.insert(
pid.get(),
ParticipantRegistration {
inbox,
behaviour,
core,
},
);
Ok(())
}
pub(super) fn deregister(&self, pid: ParticipantPid) {
if let Ok(mut registrations) = self.registrations.lock() {
registrations.remove(&pid.get());
}
}
fn run_slice(&self, pid: u64) -> usize {
let Some((inbox, behaviour, core)) = self.snapshot(pid) else {
return 0;
};
let mut processed = 0;
loop {
let request = {
let Ok(mut queue) = inbox.lock() else {
break;
};
queue.pop_front()
};
let Some(request) = request else { break };
processed += 1;
if let Some(reply) = behaviour.process(&request) {
if let Some(core) = core.upgrade() {
let _ = core.deliver_participant_reply(reply);
}
}
}
processed
}
fn snapshot(&self, pid: u64) -> Option<ParticipantSnapshot> {
let registrations = self.registrations.lock().ok()?;
let registration = registrations.get(&pid)?;
let snapshot = (
Arc::clone(®istration.inbox),
Arc::clone(®istration.behaviour),
registration.core.clone(),
);
drop(registrations);
Some(snapshot)
}
}
#[derive(Clone, Debug)]
pub(super) struct ParticipantChannel {
pid: ParticipantPid,
inbox: RequestQueue,
}
impl ParticipantChannel {
pub(super) const fn new(pid: ParticipantPid, inbox: RequestQueue) -> Self {
Self { pid, inbox }
}
pub(super) const fn pid(&self) -> ParticipantPid {
self.pid
}
pub(super) fn inbox_arc(&self) -> RequestQueue {
Arc::clone(&self.inbox)
}
pub(super) fn forward(
&self,
request: Envelope,
scheduler: &Scheduler,
wakeup_atom: Atom,
) -> Result<(), LiminalError> {
lock(&self.inbox)?.push_back(request);
if scheduler.enqueue_atom_message(self.pid.get(), wakeup_atom) {
Ok(())
} else {
lock(&self.inbox)?.pop_back();
Err(LiminalError::DeliveryFailed {
message: format!("conversation participant {} is not live", self.pid.get()),
})
}
}
}
pub(super) struct ParticipantProcess {
runtime: Arc<ParticipantRuntime>,
wakeup_atom: Atom,
}
impl ParticipantProcess {
pub(super) const fn new(runtime: Arc<ParticipantRuntime>, wakeup_atom: Atom) -> Self {
Self {
runtime,
wakeup_atom,
}
}
}
impl std::fmt::Debug for ParticipantProcess {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("ParticipantProcess")
.field("wakeup_atom", &self.wakeup_atom)
.finish_non_exhaustive()
}
}
impl NativeHandler for ParticipantProcess {
fn handle(&mut self, ctx: &mut NativeContext<'_>) -> NativeOutcome {
let pid = ctx.self_pid();
while ctx.recv().is_some() {}
self.runtime.run_slice(pid);
NativeOutcome::Wait
}
}
fn lock<T>(mutex: &Mutex<T>) -> Result<std::sync::MutexGuard<'_, T>, LiminalError> {
mutex.lock().map_err(|error| LiminalError::DeliveryFailed {
message: format!("participant queue lock poisoned: {error}"),
})
}