use std::{path::Path, thread, thread::JoinHandle};
use async_channel::{bounded, unbounded};
use jlrs_sys::jlrs_gc_safe_enter;
#[cfg(feature = "multi-rt")]
use crate::runtime::handle::mt_handle::MtHandle;
use crate::{
InstallJlrsCore,
error::{JlrsError, RuntimeError},
memory::{get_tls, stack_frame::StackFrame},
prelude::JlrsResult,
runtime::{
builder::{Builder, init_runtime},
executor::Executor,
handle::async_handle::{
AsyncHandle, cancellation_token::CancellationToken, on_main_thread,
},
state::{can_init, set_exit},
},
};
pub struct AsyncBuilder<E: Executor<N>, const N: usize> {
builder: Builder,
channel_capacity: usize,
executor_opts: E,
}
impl<E: Executor<N>, const N: usize> AsyncBuilder<E, N> {
#[inline]
pub(super) fn new(builder: Builder, executor_opts: E) -> Self {
AsyncBuilder {
builder,
channel_capacity: 0,
executor_opts,
}
}
#[inline]
pub fn spawn(self) -> JlrsResult<(AsyncHandle, JoinHandle<()>)> {
spawn_main(self.builder, self.executor_opts, self.channel_capacity)
}
#[inline]
pub fn start<T: 'static + Send>(
self,
func: impl 'static + Send + FnOnce(AsyncHandle) -> T,
) -> JlrsResult<T> {
run_main(
self.builder,
self.executor_opts,
self.channel_capacity,
func,
)
}
#[inline]
#[cfg(feature = "multi-rt")]
pub fn start_mt<'env, T: 'static + Send, F>(self, func: F) -> JlrsResult<T>
where
F: 'env + Send + for<'scope> FnOnce(MtHandle<'scope, 'env>, AsyncHandle) -> T,
{
mt_impl::run_main_mt(
self.builder,
self.executor_opts,
self.channel_capacity,
func,
)
}
#[inline]
pub fn channel_capacity(mut self, capacity: usize) -> Self {
self.channel_capacity = capacity;
self
}
#[inline]
pub const fn n_threads(mut self, n: usize) -> Self {
self.builder.n_threads = n;
self
}
#[inline]
pub const fn n_interactive_threads(mut self, n: usize) -> Self {
self.builder.n_threadsi = n;
self
}
#[inline]
pub unsafe fn image<P, Q>(mut self, julia_bindir: P, image_path: Q) -> Result<Self, Self>
where
P: AsRef<Path> + Send + 'static,
Q: AsRef<Path> + Send + 'static,
{
if !julia_bindir.as_ref().exists() {
return Err(self);
}
if !image_path.as_ref().exists() {
return Err(self);
}
self.builder.image = Some((
julia_bindir.as_ref().to_path_buf(),
image_path.as_ref().to_path_buf(),
));
Ok(self)
}
#[inline]
pub fn install_jlrs(mut self, install: InstallJlrsCore) -> Self {
self.builder.install_jlrs_core = install;
self
}
}
pub(crate) fn spawn_main<R: Executor<N>, const N: usize>(
builder: Builder,
executor_opts: R,
channel_capacity: usize,
) -> JlrsResult<(AsyncHandle, JoinHandle<()>)> {
if !can_init() {
Err(RuntimeError::AlreadyInitialized)?;
}
let token = CancellationToken::new();
let t2 = token.clone();
let (sender, receiver) = if channel_capacity == 0 {
unbounded()
} else {
bounded(channel_capacity)
};
let thread_handle = std::thread::spawn(move || unsafe {
init_runtime(&builder);
let ptls = get_tls();
jlrs_gc_safe_enter(ptls);
let mut base_frame = StackFrame::<N>::new_n();
executor_opts.block_on(on_main_thread::<R, N>(receiver, token, &mut base_frame));
set_exit();
});
unsafe {
let handle = AsyncHandle::new_main(sender, t2);
Ok((handle, thread_handle))
}
}
pub(crate) fn run_main<T: 'static + Send, R: Executor<N>, const N: usize>(
builder: Builder,
executor_opts: R,
channel_capacity: usize,
func: impl 'static + Send + FnOnce(AsyncHandle) -> T,
) -> JlrsResult<T> {
if !can_init() {
Err(RuntimeError::AlreadyInitialized)?;
}
unsafe {
init_runtime(&builder);
let token = CancellationToken::new();
let t2 = token.clone();
let (sender, receiver) = if channel_capacity == 0 {
unbounded()
} else {
bounded(channel_capacity)
};
let handle = AsyncHandle::new_main(sender, t2);
let ptls = get_tls();
jlrs_gc_safe_enter(ptls);
let handle = thread::spawn(move || func(handle));
let mut base_frame = StackFrame::<N>::new_n();
executor_opts.block_on(on_main_thread::<R, N>(receiver, token, &mut base_frame));
set_exit();
handle
.join()
.map_err(|_| Box::new(JlrsError::exception("thread panicked")))
}
}
#[cfg(feature = "multi-rt")]
mod mt_impl {
use std::{
panic::{AssertUnwindSafe, catch_unwind},
thread,
};
use jl_sys::jl_atexit_hook;
use super::super::{Builder, init_runtime};
use crate::{
error::{JlrsError, RuntimeError},
memory::{gc::gc_safe, stack_frame::StackFrame},
prelude::JlrsResult,
runtime::{
executor::Executor,
handle::{
async_handle::{
AsyncHandle, cancellation_token::CancellationToken, channel::channel,
on_main_thread,
},
mt_handle::{EXIT_LOCK, MtHandle, wait_loop},
wait,
},
state::{can_init, set_exit},
},
};
pub(crate) fn run_main_mt<'env, T, E, F, const N: usize>(
options: Builder,
executor_opts: E,
channel_capacity: usize,
func: F,
) -> JlrsResult<T>
where
T: Send + 'static,
E: Executor<N>,
F: 'env + Send + for<'scope> FnOnce(MtHandle<'scope, 'env>, AsyncHandle) -> T,
{
if !can_init() {
Err(RuntimeError::AlreadyInitialized)?;
}
let token = CancellationToken::new();
let t2 = token.clone();
let (sender, receiver) = channel(channel_capacity);
unsafe {
init_runtime(&options);
}
let async_handle = unsafe { AsyncHandle::new_main(sender, t2) };
let ret = thread::scope(|scope| {
let handle = scope.spawn(|| unsafe {
thread::scope(|scope| {
let handle = MtHandle::new(scope);
func(handle, async_handle)
})
});
unsafe {
let mut base_frame = StackFrame::<N>::new_n();
let res = catch_unwind(AssertUnwindSafe(|| {
executor_opts.block_on(on_main_thread::<E, N>(
receiver,
token,
&mut base_frame,
));
}));
wait_loop();
let th_res = handle.join();
match res {
Ok(_) => {
gc_safe(|| wait(&EXIT_LOCK));
set_exit();
jl_atexit_hook(0);
}
Err(_) => {
set_exit();
jl_atexit_hook(1);
}
}
th_res
}
});
match ret {
Ok(ret) => Ok(ret),
Err(e) => Err(JlrsError::exception(format!("{e:?}")))?,
}
}
}