1use std::any::Any;
2use std::collections::VecDeque;
3use std::sync::{Arc, Mutex, mpsc};
4use std::time::Instant;
5
6use beamr::atom::{Atom, AtomTable};
7use beamr::module::ModuleRegistry;
8use beamr::scheduler::{Scheduler, SchedulerConfig};
9
10mod backend;
11mod beam;
12mod core;
13mod exit;
14mod queue;
15mod sync;
16
17use crate::conversation::participant::{
18 ParticipantBehaviour, ParticipantChannel, ParticipantProcess, ParticipantRuntime,
19};
20use crate::conversation::types::{
21 ConversationConfig, ConversationHandle, ConversationState, CrashPolicy, ParticipantPid,
22};
23use crate::envelope::Envelope;
24use crate::error::LiminalError;
25use backend::ActorBackend;
26use beam::{ActorRuntime, actor_module};
27pub(crate) use core::ActorCore;
28
29#[cfg(test)]
30mod tests;
31
32#[derive(Clone, Debug, PartialEq, Eq)]
33pub enum ConversationCommand {
34 Send(Envelope),
35 Receive,
36 Close,
37 QueryState,
38}
39
40#[derive(Clone, Debug)]
41pub struct ConversationSupervisor {
42 inner: Arc<SupervisorInner>,
43}
44
45impl ConversationSupervisor {
46 pub fn new() -> Result<Self, LiminalError> {
49 SupervisorInner::new().map(|inner| Self {
50 inner: Arc::new(inner),
51 })
52 }
53
54 pub fn spawn(&self, config: ConversationConfig) -> Result<ConversationActor, LiminalError> {
64 let core = Arc::new(ActorCore::new(Arc::clone(&self.inner), config, Vec::new()));
65 self.inner.spawn_actor_for(&core)?;
66 let handle = ConversationHandle::new(Arc::new(ActorBackend {
67 core: Arc::clone(&core),
68 }));
69 Ok(ConversationActor { core, handle })
70 }
71
72 pub fn spawn_with_participant(
85 &self,
86 behaviour: Arc<dyn ParticipantBehaviour>,
87 timeout: Option<std::time::Duration>,
88 mode: crate::channel::ChannelMode,
89 on_crash: CrashPolicy,
90 ) -> Result<(ConversationActor, ParticipantPid), LiminalError> {
91 let channel = self.inner.spawn_participant()?;
92 let participant = channel.pid();
93 let config = ConversationConfig::new(vec![participant], timeout, mode, on_crash);
94 let core = Arc::new(ActorCore::new(
95 Arc::clone(&self.inner),
96 config,
97 vec![channel.clone()],
98 ));
99 self.inner.participant_runtime.register(
102 participant,
103 channel.inbox_arc(),
104 behaviour,
105 Arc::downgrade(&core),
106 )?;
107 self.inner.spawn_actor_for(&core)?;
108 let handle = ConversationHandle::new(Arc::new(ActorBackend {
109 core: Arc::clone(&core),
110 }));
111 Ok((ConversationActor { core, handle }, participant))
112 }
113
114 #[must_use]
116 pub fn scheduler(&self) -> Arc<Scheduler> {
117 Arc::clone(&self.inner.scheduler)
118 }
119
120 pub fn shutdown(&self) {
122 self.inner.scheduler.shutdown();
123 }
124}
125
126#[derive(Clone, Debug)]
127pub struct ConversationActor {
128 core: Arc<ActorCore>,
129 handle: ConversationHandle,
130}
131
132impl ConversationActor {
133 #[must_use]
135 pub fn handle(&self) -> ConversationHandle {
136 self.handle.clone()
137 }
138
139 pub fn pid(&self) -> Result<ParticipantPid, LiminalError> {
144 self.core.ensure_running()
145 }
146
147 pub fn state(&self) -> Result<ConversationState, LiminalError> {
152 self.handle.query_state()
153 }
154
155 pub fn receive_timeout(&self, timeout: std::time::Duration) -> Result<Envelope, LiminalError> {
164 self.core.submit_receive_timeout(timeout)
165 }
166
167 pub fn notify_on_participant_exit(
177 &self,
178 participant: ParticipantPid,
179 notifier: mpsc::SyncSender<Instant>,
180 ) -> Result<(), LiminalError> {
181 self.core.register_exit_notifier(participant, notifier)
182 }
183}
184
185struct SupervisorInner {
186 scheduler: Arc<Scheduler>,
187 runtime: Arc<ActorRuntime>,
188 participant_runtime: Arc<ParticipantRuntime>,
189 participant_wakeup_atom: Atom,
190 module_name: Atom,
191 entry_function: Atom,
192}
193
194impl std::fmt::Debug for SupervisorInner {
195 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
196 formatter
197 .debug_struct("SupervisorInner")
198 .field("runtime", &self.runtime)
199 .field("module_name", &self.module_name)
200 .field("entry_function", &self.entry_function)
201 .finish_non_exhaustive()
202 }
203}
204
205impl SupervisorInner {
206 fn new() -> Result<Self, LiminalError> {
207 let atoms = AtomTable::with_common_atoms();
208 let module_name = atoms.intern("liminal_conversation_actor");
209 let entry_function = atoms.intern("main");
210 let command_function = atoms.intern("process_command");
211 let command_atom = atoms.intern("liminal_conversation_command");
212 let participant_wakeup_atom = atoms.intern("liminal_conversation_participant_wakeup");
213 let runtime = Arc::new(ActorRuntime::new(command_atom));
214 let participant_runtime = Arc::new(ParticipantRuntime::default());
215 let registry = Arc::new(ModuleRegistry::new());
216 registry.insert(actor_module(module_name, entry_function, command_function));
217 let private_data: Arc<dyn Any + Send + Sync> = runtime.clone();
218 let scheduler = Scheduler::new(
219 SchedulerConfig {
220 thread_count: Some(1),
221 nif_private_data: Some(private_data),
222 ..SchedulerConfig::default()
223 },
224 registry,
225 )
226 .map_err(|message| LiminalError::ConversationFailed { message })?;
227 Ok(Self {
228 scheduler: Arc::new(scheduler),
229 runtime,
230 participant_runtime,
231 participant_wakeup_atom,
232 module_name,
233 entry_function,
234 })
235 }
236
237 fn spawn_participant(&self) -> Result<ParticipantChannel, LiminalError> {
243 let runtime = Arc::clone(&self.participant_runtime);
244 let wakeup_atom = self.participant_wakeup_atom;
245 let factory = Box::new(move || {
246 Box::new(ParticipantProcess::new(Arc::clone(&runtime), wakeup_atom))
247 as Box<dyn beamr::native::native_process::NativeHandler>
248 });
249 let pid = self.scheduler.spawn_native(factory).map_err(|error| {
250 LiminalError::ConversationFailed {
251 message: format!("failed to spawn conversation participant: {error}"),
252 }
253 })?;
254 let participant = ParticipantPid::new(pid);
255 let inbox = Arc::new(Mutex::new(VecDeque::new()));
256 Ok(ParticipantChannel::new(participant, inbox))
257 }
258
259 fn spawn_actor_for(&self, core: &Arc<ActorCore>) -> Result<ParticipantPid, LiminalError> {
260 let pid = self
261 .scheduler
262 .spawn_trap_exit(self.module_name, self.entry_function, Vec::new())
263 .map_err(|error| LiminalError::ConversationFailed {
264 message: format!("failed to spawn conversation actor: {error}"),
265 })?;
266 let participant = ParticipantPid::new(pid);
267 self.runtime.register(participant, Arc::downgrade(core))?;
268 core.set_current_pid(participant)?;
269 core.boot(participant)?;
270 Ok(participant)
271 }
272}