use std::collections::VecDeque;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex, mpsc};
use std::time::Instant;
use beamr::atom::Atom;
use beamr::native::ProcessContext;
use beamr::term::Term;
use super::beam;
use super::exit::ExitNotifierRegistry;
use super::queue::{QueuedCommand, QueuedCommandKind};
use super::sync::{self, lock, send_reply, wait_for};
use super::{ParticipantChannel, SupervisorInner};
use crate::conversation::types::{
ConversationConfig, ConversationPhase, ConversationState, CrashPolicy, ParticipantPid,
};
use crate::envelope::Envelope;
use crate::error::LiminalError;
pub struct ActorCore {
supervisor: Arc<SupervisorInner>,
pub(super) config: ConversationConfig,
state: Mutex<ConversationState>,
inbox: Mutex<VecDeque<Envelope>>,
pending_receives: Mutex<VecDeque<mpsc::SyncSender<Result<Envelope, LiminalError>>>>,
commands: Mutex<VecDeque<QueuedCommand>>,
current_pid: Mutex<Option<ParticipantPid>>,
restart_lock: Mutex<()>,
next_command_id: AtomicU64,
exit_notifiers: ExitNotifierRegistry,
participant_channels: Vec<ParticipantChannel>,
}
impl std::fmt::Debug for ActorCore {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
formatter
.debug_struct("ActorCore")
.field("config", &self.config)
.field("current_pid", &self.current_pid.lock().ok())
.finish_non_exhaustive()
}
}
impl ActorCore {
pub(super) fn new(
supervisor: Arc<SupervisorInner>,
config: ConversationConfig,
participant_channels: Vec<ParticipantChannel>,
) -> Self {
let state = ConversationState::from_config(&config, Instant::now());
Self {
supervisor,
config,
state: Mutex::new(state),
inbox: Mutex::new(VecDeque::new()),
pending_receives: Mutex::new(VecDeque::new()),
commands: Mutex::new(VecDeque::new()),
current_pid: Mutex::new(None),
restart_lock: Mutex::new(()),
next_command_id: AtomicU64::new(1),
exit_notifiers: ExitNotifierRegistry::default(),
participant_channels,
}
}
pub(super) fn ensure_running(self: &Arc<Self>) -> Result<ParticipantPid, LiminalError> {
let restart_guard = lock(&self.restart_lock, "actor restart")?;
if self.is_closed()? {
return Err(LiminalError::ConversationFailed {
message: "conversation is closed".to_owned(),
});
}
let current = *lock(&self.current_pid, "actor pid")?;
if let Some(pid) = current {
if self
.supervisor
.scheduler
.process_table()
.get(pid.get())
.is_some()
{
return Ok(pid);
}
}
let pid = self.supervisor.spawn_actor_for(self);
drop(restart_guard);
pid
}
pub(super) fn set_current_pid(&self, pid: ParticipantPid) -> Result<(), LiminalError> {
*lock(&self.current_pid, "actor pid")? = Some(pid);
Ok(())
}
pub(super) fn boot(self: &Arc<Self>, pid: ParticipantPid) -> Result<(), LiminalError> {
let (reply, response) = mpsc::sync_channel(1);
self.enqueue_for_pid(pid, QueuedCommandKind::Boot { reply })?;
wait_for(&response, "conversation actor boot")
}
pub(super) fn submit_send(self: &Arc<Self>, message: Envelope) -> Result<(), LiminalError> {
let pid = self.ensure_running()?;
let (reply, response) = mpsc::sync_channel(1);
self.enqueue_for_pid(pid, QueuedCommandKind::Send { message, reply })?;
wait_for(&response, "conversation send")
}
pub(super) fn submit_receive(self: &Arc<Self>) -> Result<Envelope, LiminalError> {
let pid = self.ensure_running()?;
let (reply, response) = mpsc::sync_channel(1);
self.enqueue_for_pid(pid, QueuedCommandKind::Receive { reply })?;
wait_for(&response, "conversation receive")
}
pub(super) fn submit_receive_timeout(
self: &Arc<Self>,
timeout: std::time::Duration,
) -> Result<Envelope, LiminalError> {
let pid = self.ensure_running()?;
let (reply, response) = mpsc::sync_channel(1);
self.enqueue_for_pid(pid, QueuedCommandKind::Receive { reply })?;
sync::wait_for_timeout(&response, "conversation receive", timeout)
}
pub(super) fn submit_close(self: &Arc<Self>) -> Result<(), LiminalError> {
let pid = self.ensure_running()?;
let (reply, response) = mpsc::sync_channel(1);
self.enqueue_for_pid(pid, QueuedCommandKind::Close { reply })?;
wait_for(&response, "conversation close")
}
pub(super) fn submit_query_state(self: &Arc<Self>) -> Result<ConversationState, LiminalError> {
if self.is_closed()? {
return self.snapshot();
}
let pid = self.ensure_running()?;
let (reply, response) = mpsc::sync_channel(1);
self.enqueue_for_pid(pid, QueuedCommandKind::QueryState { reply })?;
wait_for(&response, "conversation state query")
}
fn enqueue_for_pid(
&self,
pid: ParticipantPid,
kind: QueuedCommandKind,
) -> Result<(), LiminalError> {
let id = self.next_command_id.fetch_add(1, Ordering::Relaxed);
lock(&self.commands, "actor command queue")?.push_back(QueuedCommand { id, kind });
if self
.supervisor
.scheduler
.enqueue_atom_message(pid.get(), self.supervisor.runtime.command_atom())
{
Ok(())
} else {
self.remove_command(id)?;
Err(LiminalError::DeliveryFailed {
message: format!("conversation actor pid {} is not live", pid.get()),
})
}
}
fn remove_command(&self, id: u64) -> Result<(), LiminalError> {
lock(&self.commands, "actor command queue")?.retain(|command| command.id != id);
Ok(())
}
pub(super) fn process_next_command(
&self,
context: &mut ProcessContext<'_>,
) -> Result<Term, Term> {
let Some(command) = lock(&self.commands, "actor command queue")
.map_err(|_| Term::atom(Atom::BADARG))?
.pop_front()
else {
return Ok(Term::atom(Atom::OK));
};
match command.kind {
QueuedCommandKind::Boot { reply } => {
send_reply(&reply, beam::link_participants(self, context));
}
QueuedCommandKind::Send { message, reply } => {
send_reply(&reply, self.apply_send(message));
}
QueuedCommandKind::Receive { reply } => {
self.apply_receive(reply);
}
QueuedCommandKind::Close { reply } => {
let result = self.apply_close();
let should_shutdown = result.is_ok();
send_reply(&reply, result);
if should_shutdown {
context.request_shutdown();
}
}
QueuedCommandKind::QueryState { reply } => send_reply(&reply, self.snapshot()),
}
Ok(Term::atom(Atom::OK))
}
fn apply_send(&self, message: Envelope) -> Result<(), LiminalError> {
{
let mut state = lock(&self.state, "conversation state")?;
state.activate()?;
state.record_sent(message.clone());
}
if let Some(channel) = self.participant_channels.first() {
return channel.forward(
message,
&self.supervisor.scheduler,
self.supervisor.participant_wakeup_atom,
);
}
if !self.config.participants.is_empty() {
return Ok(());
}
let reply = { lock(&self.pending_receives, "pending receives")?.pop_front() };
if let Some(reply) = reply {
lock(&self.state, "conversation state")?.record_received(message.clone());
send_reply(&reply, Ok(message));
} else {
lock(&self.inbox, "conversation inbox")?.push_back(message);
}
Ok(())
}
pub fn deliver_participant_reply(&self, reply: Envelope) -> Result<(), LiminalError> {
let waiter = { lock(&self.pending_receives, "pending receives")?.pop_front() };
let mut state = lock(&self.state, "conversation state")?;
state.record_received(reply.clone());
drop(state);
if let Some(waiter) = waiter {
send_reply(&waiter, Ok(reply));
} else {
lock(&self.inbox, "conversation inbox")?.push_back(reply);
}
Ok(())
}
fn apply_receive(&self, reply: mpsc::SyncSender<Result<Envelope, LiminalError>>) {
let envelope = match lock(&self.inbox, "conversation inbox") {
Ok(mut inbox) => inbox.pop_front(),
Err(error) => {
send_reply(&reply, Err(error));
return;
}
};
{
let mut state = match lock(&self.state, "conversation state") {
Ok(state) => state,
Err(error) => {
send_reply(&reply, Err(error));
return;
}
};
if state.current_phase == ConversationPhase::Failed && envelope.is_none() {
send_reply(
&reply,
Err(LiminalError::ParticipantCrashed {
message: "conversation participant crashed".to_owned(),
}),
);
return;
}
if let Err(error) = state.activate() {
send_reply(&reply, Err(error));
return;
}
if let Some(envelope) = &envelope {
state.record_received(envelope.clone());
}
}
if let Some(envelope) = envelope {
send_reply(&reply, Ok(envelope));
} else {
match lock(&self.pending_receives, "pending receives") {
Ok(mut pending) => pending.push_back(reply),
Err(error) => send_reply(&reply, Err(error)),
}
}
}
fn apply_close(&self) -> Result<(), LiminalError> {
{
let mut state = lock(&self.state, "conversation state")?;
if state.current_phase == ConversationPhase::Created {
state.activate()?;
}
state.begin_completing()?;
state.close()?;
}
let message = "conversation closed before receive completed".to_owned();
for reply in lock(&self.pending_receives, "pending receives")?.drain(..) {
send_reply(
&reply,
Err(LiminalError::ConversationFailed {
message: message.clone(),
}),
);
}
Ok(())
}
fn snapshot(&self) -> Result<ConversationState, LiminalError> {
Ok(lock(&self.state, "conversation state")?.clone())
}
pub(super) fn register_exit_notifier(
&self,
participant: ParticipantPid,
notifier: mpsc::SyncSender<Instant>,
) -> Result<(), LiminalError> {
let state = lock(&self.state, "conversation state")?;
self.exit_notifiers
.register(participant, notifier, &state.participants)
}
pub(super) fn handle_participant_exit(
&self,
participant: ParticipantPid,
) -> Result<(), LiminalError> {
let observed_at = Instant::now();
self.supervisor.participant_runtime.deregister(participant);
let failed = {
let mut state = lock(&self.state, "conversation state")?;
state.record_participant_crash(participant, self.config.on_crash, observed_at);
self.exit_notifiers.signal(participant, observed_at)?;
if self.config.on_crash == CrashPolicy::Fail {
state.fail();
true
} else {
false
}
};
if failed {
let message = format!("conversation participant {} crashed", participant.get());
for reply in lock(&self.pending_receives, "pending receives")?.drain(..) {
send_reply(
&reply,
Err(LiminalError::ParticipantCrashed {
message: message.clone(),
}),
);
}
}
Ok(())
}
fn is_closed(&self) -> Result<bool, LiminalError> {
Ok(matches!(
lock(&self.state, "conversation state")?.current_phase,
ConversationPhase::Closed
))
}
}