use std::marker::PhantomData;
use super::{lifecycles, AbstractProcess, ProcessRef, StartupError};
use crate::function::process::{process_name, ProcessType};
use crate::{LunaticError, Mailbox, Process, ProcessConfig, ProcessName, Tag};
trait IntoAbstractProcessBuilder<T> {}
pub struct AbstractProcessBuilder<'a, T: ?Sized> {
link: Option<Tag>,
config: Option<&'a ProcessConfig>,
node: Option<u64>,
phantom: PhantomData<T>,
}
impl<'a, T> AbstractProcessBuilder<'a, T>
where
T: AbstractProcess,
{
pub(crate) fn new() -> AbstractProcessBuilder<'a, T> {
AbstractProcessBuilder {
link: None,
config: None,
node: None,
phantom: PhantomData,
}
}
pub fn link(self) -> AbstractProcessBuilder<'a, T> {
AbstractProcessBuilder {
link: Some(Tag::new()),
config: self.config,
node: self.node,
phantom: PhantomData,
}
}
pub fn link_with(self, tag: Tag) -> AbstractProcessBuilder<'a, T> {
AbstractProcessBuilder {
link: Some(tag),
config: self.config,
node: self.node,
phantom: PhantomData,
}
}
pub fn configure(self, config: &'a ProcessConfig) -> AbstractProcessBuilder<'a, T> {
AbstractProcessBuilder {
link: self.link,
config: Some(config),
node: self.node,
phantom: PhantomData,
}
}
pub fn on_node(self, node: u64) -> AbstractProcessBuilder<'a, T> {
AbstractProcessBuilder {
link: self.link,
config: self.config,
node: Some(node),
phantom: PhantomData,
}
}
#[track_caller]
pub fn start(&self, arg: T::Arg) -> Result<ProcessRef<T>, StartupError<T>> {
let init_tag = Tag::new();
let this = unsafe { Process::<Result<(), StartupError<T>>, T::Serializer>::this() };
let entry_data = (this, init_tag, arg);
let process = match (self.link, &self.config, self.node) {
(Some(_), _, Some(_node)) => {
unimplemented!("Linking across nodes is not supported yet");
}
(Some(tag), Some(config), None) => Process::<(), T::Serializer>::spawn_link_config_tag(
config,
entry_data,
tag,
lifecycles::entry::<T>,
),
(Some(tag), None, None) => Process::<(), T::Serializer>::spawn_link_tag(
entry_data,
tag,
lifecycles::entry::<T>,
),
(None, Some(config), Some(node)) => Process::<(), T::Serializer>::spawn_node_config(
node,
config,
entry_data,
lifecycles::entry::<T>,
),
(None, None, Some(node)) => {
Process::<(), T::Serializer>::spawn_node(node, entry_data, lifecycles::entry::<T>)
}
(None, Some(config), None) => Process::<(), T::Serializer>::spawn_config(
config,
entry_data,
lifecycles::entry::<T>,
),
(None, None, None) => {
Process::<(), T::Serializer>::spawn(entry_data, lifecycles::entry::<T>)
}
};
let mailbox: Mailbox<Result<(), StartupError<T>>, T::Serializer> =
unsafe { Mailbox::new() };
match mailbox.tag_receive(&[init_tag]) {
Ok(()) => Ok(ProcessRef { process }),
Err(err) => Err(err),
}
}
#[track_caller]
pub fn start_as<N: ProcessName>(
&self,
name: &N,
arg: T::Arg,
) -> Result<ProcessRef<T>, StartupError<T>> {
let name: &str = name.process_name();
let name = process_name::<T, T::Serializer>(ProcessType::ProcessRef, name);
let init_tag = Tag::new();
let this = unsafe { Process::<Result<(), StartupError<T>>, T::Serializer>::this() };
let entry_data = (this, init_tag, arg);
let process = match (self.link, &self.config, self.node) {
(Some(_), _, Some(_node)) => {
unimplemented!("Linking across nodes is not supported yet");
}
(Some(tag), Some(config), None) => {
Process::<(), T::Serializer>::name_spawn_link_config_tag(
&name,
config,
entry_data,
tag,
lifecycles::entry::<T>,
)
}
(Some(tag), None, None) => Process::<(), T::Serializer>::name_spawn_link_tag(
&name,
entry_data,
tag,
lifecycles::entry::<T>,
),
(None, Some(config), Some(node)) => {
Process::<(), T::Serializer>::name_spawn_node_config(
&name,
node,
config,
entry_data,
lifecycles::entry::<T>,
)
}
(None, None, Some(node)) => Process::<(), T::Serializer>::name_spawn_node(
&name,
node,
entry_data,
lifecycles::entry::<T>,
),
(None, Some(config), None) => Process::<(), T::Serializer>::name_spawn_config(
&name,
config,
entry_data,
lifecycles::entry::<T>,
),
(None, None, None) => {
Process::<(), T::Serializer>::name_spawn(&name, entry_data, lifecycles::entry::<T>)
}
};
let process = match process {
Ok(process) => process,
Err(LunaticError::NameAlreadyRegistered(node_id, process_id)) => {
return Err(StartupError::NameAlreadyRegistered(ProcessRef {
process: unsafe { Process::new(node_id, process_id) },
}));
}
_ => unreachable!(),
};
let mailbox: Mailbox<Result<(), StartupError<T>>, T::Serializer> =
unsafe { Mailbox::new() };
match mailbox.tag_receive(&[init_tag]) {
Ok(()) => Ok(ProcessRef { process }),
Err(err) => Err(err),
}
}
}