Skip to main content

liminal/conversation/
actor.rs

1use std::any::Any;
2use std::collections::VecDeque;
3use std::sync::{Arc, Mutex, mpsc};
4use std::time::Instant;
5
6use beamr::atom::{Atom, AtomTable};
7use beamr::module::ModuleRegistry;
8use beamr::scheduler::{Scheduler, SchedulerConfig};
9
10mod backend;
11mod beam;
12mod core;
13mod exit;
14mod queue;
15mod sync;
16
17use crate::conversation::participant::{
18    ParticipantBehaviour, ParticipantChannel, ParticipantProcess, ParticipantRuntime,
19};
20use crate::conversation::types::{
21    ConversationConfig, ConversationHandle, ConversationState, CrashPolicy, ParticipantPid,
22};
23use crate::envelope::Envelope;
24use crate::error::LiminalError;
25use backend::ActorBackend;
26use beam::{ActorRuntime, actor_module};
27pub(crate) use core::ActorCore;
28
29#[cfg(test)]
30mod tests;
31
32#[derive(Clone, Debug, PartialEq, Eq)]
33pub enum ConversationCommand {
34    Send(Envelope),
35    Receive,
36    Close,
37    QueryState,
38}
39
40#[derive(Clone, Debug)]
41pub struct ConversationSupervisor {
42    inner: Arc<SupervisorInner>,
43}
44
45impl ConversationSupervisor {
46    /// # Errors
47    /// Returns [`LiminalError`] when the beamr scheduler cannot start.
48    pub fn new() -> Result<Self, LiminalError> {
49        SupervisorInner::new().map(|inner| Self {
50            inner: Arc::new(inner),
51        })
52    }
53
54    /// Spawns one supervised conversation actor over the given participant pids.
55    ///
56    /// The participants are linked for crash detection but are NOT forwarded
57    /// requests (they are inert from the conversation's perspective). Use
58    /// [`ConversationSupervisor::spawn_with_participant`] to attach a real
59    /// participant that processes forwarded messages.
60    ///
61    /// # Errors
62    /// Returns [`LiminalError`] when spawn, boot, or participant linking fails.
63    pub fn spawn(&self, config: ConversationConfig) -> Result<ConversationActor, LiminalError> {
64        let core = Arc::new(ActorCore::new(Arc::clone(&self.inner), config, Vec::new()));
65        self.inner.spawn_actor_for(&core)?;
66        let handle = ConversationHandle::new(Arc::new(ActorBackend {
67            core: Arc::clone(&core),
68        }));
69        Ok(ConversationActor { core, handle })
70    }
71
72    /// Spawns a real participant native process running `behaviour`, then a
73    /// supervised conversation actor linked to it. Requests sent through the
74    /// returned actor's handle are FORWARDED to the participant process, which
75    /// genuinely processes them and delivers any reply back into the
76    /// conversation — the request-reply path from LIM-005.
77    ///
78    /// Returns the actor and the spawned participant's pid (for crash injection
79    /// and linkage assertions).
80    ///
81    /// # Errors
82    /// Returns [`LiminalError`] when participant spawn, actor spawn, boot, or
83    /// linking fails.
84    pub fn spawn_with_participant(
85        &self,
86        behaviour: Arc<dyn ParticipantBehaviour>,
87        timeout: Option<std::time::Duration>,
88        mode: crate::channel::ChannelMode,
89        on_crash: CrashPolicy,
90    ) -> Result<(ConversationActor, ParticipantPid), LiminalError> {
91        let channel = self.inner.spawn_participant()?;
92        let participant = channel.pid();
93        let config = ConversationConfig::new(vec![participant], timeout, mode, on_crash);
94        let core = Arc::new(ActorCore::new(
95            Arc::clone(&self.inner),
96            config,
97            vec![channel.clone()],
98        ));
99        // Register the participant with its inbox, behaviour, and a weak handle to
100        // the core so produced replies route back into this conversation.
101        self.inner.participant_runtime.register(
102            participant,
103            channel.inbox_arc(),
104            behaviour,
105            Arc::downgrade(&core),
106        )?;
107        self.inner.spawn_actor_for(&core)?;
108        let handle = ConversationHandle::new(Arc::new(ActorBackend {
109            core: Arc::clone(&core),
110        }));
111        Ok((ConversationActor { core, handle }, participant))
112    }
113
114    /// Returns the scheduler used by this supervisor.
115    #[must_use]
116    pub fn scheduler(&self) -> Arc<Scheduler> {
117        Arc::clone(&self.inner.scheduler)
118    }
119
120    /// Stops the underlying scheduler.
121    pub fn shutdown(&self) {
122        self.inner.scheduler.shutdown();
123    }
124}
125
126#[derive(Clone, Debug)]
127pub struct ConversationActor {
128    core: Arc<ActorCore>,
129    handle: ConversationHandle,
130}
131
132impl ConversationActor {
133    /// Returns a cloneable command handle.
134    #[must_use]
135    pub fn handle(&self) -> ConversationHandle {
136        self.handle.clone()
137    }
138
139    /// Returns the current actor PID, restarting after crash when needed.
140    ///
141    /// # Errors
142    /// Returns [`LiminalError`] when the actor is closed or cannot restart.
143    pub fn pid(&self) -> Result<ParticipantPid, LiminalError> {
144        self.core.ensure_running()
145    }
146
147    /// Queries actor state.
148    ///
149    /// # Errors
150    /// Returns [`LiminalError`] when the actor cannot service the query.
151    pub fn state(&self) -> Result<ConversationState, LiminalError> {
152        self.handle.query_state()
153    }
154
155    /// Receives the next reply from the conversation, bounded by `timeout`.
156    ///
157    /// Returns [`LiminalError::ConversationTimeout`] if no reply arrives in time,
158    /// or [`LiminalError::ParticipantCrashed`] if a linked participant crashes
159    /// while waiting (the crash drains the pending receive immediately).
160    ///
161    /// # Errors
162    /// Returns [`LiminalError`] on timeout, participant crash, or actor failure.
163    pub fn receive_timeout(&self, timeout: std::time::Duration) -> Result<Envelope, LiminalError> {
164        self.core.submit_receive_timeout(timeout)
165    }
166
167    /// Registers a one-shot notifier fired the instant `participant`'s trapped
168    /// EXIT is processed (carrying the observed [`Instant`] — a structural link
169    /// wakeup, not a poll). If `participant` is already dead at registration
170    /// (it crashed before this call), the recorded EXIT instant is replayed
171    /// immediately, so a crash-before-register is never lost. See
172    /// [`ActorCore::register_exit_notifier`].
173    ///
174    /// # Errors
175    /// Returns [`LiminalError`] when a state or registry lock is poisoned.
176    pub fn notify_on_participant_exit(
177        &self,
178        participant: ParticipantPid,
179        notifier: mpsc::SyncSender<Instant>,
180    ) -> Result<(), LiminalError> {
181        self.core.register_exit_notifier(participant, notifier)
182    }
183}
184
185struct SupervisorInner {
186    scheduler: Arc<Scheduler>,
187    runtime: Arc<ActorRuntime>,
188    participant_runtime: Arc<ParticipantRuntime>,
189    participant_wakeup_atom: Atom,
190    module_name: Atom,
191    entry_function: Atom,
192}
193
194impl std::fmt::Debug for SupervisorInner {
195    fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196        formatter
197            .debug_struct("SupervisorInner")
198            .field("runtime", &self.runtime)
199            .field("module_name", &self.module_name)
200            .field("entry_function", &self.entry_function)
201            .finish_non_exhaustive()
202    }
203}
204
205impl SupervisorInner {
206    fn new() -> Result<Self, LiminalError> {
207        let atoms = AtomTable::with_common_atoms();
208        let module_name = atoms.intern("liminal_conversation_actor");
209        let entry_function = atoms.intern("main");
210        let command_function = atoms.intern("process_command");
211        let command_atom = atoms.intern("liminal_conversation_command");
212        let participant_wakeup_atom = atoms.intern("liminal_conversation_participant_wakeup");
213        let runtime = Arc::new(ActorRuntime::new(command_atom));
214        let participant_runtime = Arc::new(ParticipantRuntime::default());
215        let registry = Arc::new(ModuleRegistry::new());
216        registry.insert(actor_module(module_name, entry_function, command_function));
217        let private_data: Arc<dyn Any + Send + Sync> = runtime.clone();
218        let scheduler = Scheduler::new(
219            SchedulerConfig {
220                thread_count: Some(1),
221                nif_private_data: Some(private_data),
222                ..SchedulerConfig::default()
223            },
224            registry,
225        )
226        .map_err(|message| LiminalError::ConversationFailed { message })?;
227        Ok(Self {
228            scheduler: Arc::new(scheduler),
229            runtime,
230            participant_runtime,
231            participant_wakeup_atom,
232            module_name,
233            entry_function,
234        })
235    }
236
237    /// Spawns a real participant native process running `behaviour`, registers it
238    /// with the participant runtime, and returns the channel the conversation
239    /// actor forwards requests through plus the participant pid. The process is a
240    /// first-class beamr [`NativeHandler`]; the conversation actor links to it
241    /// during boot for structural crash detection.
242    fn spawn_participant(&self) -> Result<ParticipantChannel, LiminalError> {
243        let runtime = Arc::clone(&self.participant_runtime);
244        let wakeup_atom = self.participant_wakeup_atom;
245        let factory = Box::new(move || {
246            Box::new(ParticipantProcess::new(Arc::clone(&runtime), wakeup_atom))
247                as Box<dyn beamr::native::native_process::NativeHandler>
248        });
249        let pid = self.scheduler.spawn_native(factory).map_err(|error| {
250            LiminalError::ConversationFailed {
251                message: format!("failed to spawn conversation participant: {error}"),
252            }
253        })?;
254        let participant = ParticipantPid::new(pid);
255        let inbox = Arc::new(Mutex::new(VecDeque::new()));
256        Ok(ParticipantChannel::new(participant, inbox))
257    }
258
259    fn spawn_actor_for(&self, core: &Arc<ActorCore>) -> Result<ParticipantPid, LiminalError> {
260        let pid = self
261            .scheduler
262            .spawn_trap_exit(self.module_name, self.entry_function, Vec::new())
263            .map_err(|error| LiminalError::ConversationFailed {
264                message: format!("failed to spawn conversation actor: {error}"),
265            })?;
266        let participant = ParticipantPid::new(pid);
267        self.runtime.register(participant, Arc::downgrade(core))?;
268        core.set_current_pid(participant)?;
269        core.boot(participant)?;
270        Ok(participant)
271    }
272}