use crate::sys::{enif_alloc_env, enif_clear_env, enif_free_env, enif_send, enif_whereis_pid};
use crate::thread::is_scheduler_thread;
use crate::types::LocalPid;
use crate::wrapper::{NIF_ENV, NIF_TERM};
use crate::{Encoder, Term};
use std::marker::PhantomData;
use std::ptr;
use std::sync::{Arc, Weak};
type EnvId<'a> = PhantomData<*mut &'a u8>;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub(crate) enum EnvKind {
ProcessBound,
Callback,
Init,
ProcessIndependent,
}
#[derive(Clone, Copy)]
pub struct Env<'a> {
pub(crate) kind: EnvKind,
env: NIF_ENV,
id: EnvId<'a>,
}
impl<'b> PartialEq<Env<'b>> for Env<'_> {
fn eq(&self, other: &Env<'b>) -> bool {
self.env == other.env
}
}
#[derive(Clone, Copy, Debug)]
pub struct SendError;
impl<'a> Env<'a> {
#[doc(hidden)]
pub(crate) unsafe fn new_internal<T>(
_lifetime_marker: &'a T,
env: NIF_ENV,
kind: EnvKind,
) -> Env<'a> {
Env {
kind,
env,
id: PhantomData,
}
}
pub unsafe fn new<T>(_lifetime_marker: &'a T, env: NIF_ENV) -> Env<'a> {
Self::new_internal(_lifetime_marker, env, EnvKind::ProcessBound)
}
#[doc(hidden)]
pub unsafe fn new_init_env<T>(_lifetime_marker: &'a T, env: NIF_ENV) -> Env<'a> {
Self::new_internal(_lifetime_marker, env, EnvKind::Init)
}
pub fn as_c_arg(self) -> NIF_ENV {
self.env
}
pub fn error_tuple(self, reason: impl Encoder) -> Term<'a> {
let error = crate::types::atom::error().to_term(self);
(error, reason).encode(self)
}
pub fn send(self, pid: &LocalPid, message: impl Encoder) -> Result<(), SendError> {
let env = if is_scheduler_thread() {
if self.kind == EnvKind::ProcessIndependent {
return Err(SendError);
}
self.as_c_arg()
} else {
ptr::null_mut()
};
let message = message.encode(self);
let res = unsafe { enif_send(env, pid.as_c_arg(), ptr::null_mut(), message.as_c_arg()) };
if res == 1 {
Ok(())
} else {
Err(SendError)
}
}
pub fn whereis_pid(self, name_or_pid: impl Encoder) -> Option<LocalPid> {
let name_or_pid = name_or_pid.encode(self);
if name_or_pid.is_pid() {
return Some(name_or_pid.decode().unwrap());
}
let mut enif_pid = std::mem::MaybeUninit::uninit();
if unsafe {
enif_whereis_pid(
self.as_c_arg(),
name_or_pid.as_c_arg(),
enif_pid.as_mut_ptr(),
)
} == 0
{
None
} else {
let enif_pid = unsafe { enif_pid.assume_init() };
let pid = LocalPid::from_c_arg(enif_pid);
Some(pid)
}
}
pub fn binary_to_term(self, data: &[u8]) -> Option<(Term<'a>, usize)> {
unsafe {
crate::wrapper::env::binary_to_term(self.as_c_arg(), data, true)
.map(|(term, size)| (Term::new(self, term), size))
}
}
pub unsafe fn binary_to_term_trusted(self, data: &[u8]) -> Option<(Term<'a>, usize)> {
crate::wrapper::env::binary_to_term(self.as_c_arg(), data, false)
.map(|(term, size)| (Term::new(self, term), size))
}
}
pub struct OwnedEnv {
env: Arc<NIF_ENV>,
}
unsafe impl Send for OwnedEnv {}
impl OwnedEnv {
#[allow(clippy::arc_with_non_send_sync)] pub fn new() -> OwnedEnv {
OwnedEnv {
env: Arc::new(unsafe { enif_alloc_env() }),
}
}
pub fn run<'a, F, R>(&self, closure: F) -> R
where
F: FnOnce(Env<'a>) -> R,
{
let env = unsafe { Env::new_internal(&(), *self.env, EnvKind::ProcessIndependent) };
closure(env)
}
pub fn send_and_clear<'a, F, T>(
&mut self,
recipient: &LocalPid,
closure: F,
) -> Result<(), SendError>
where
F: FnOnce(Env<'a>) -> T,
T: Encoder,
{
if is_scheduler_thread() {
panic!("send_and_clear: current thread is managed");
}
let message = self.run(|env| closure(env).encode(env).as_c_arg());
let res = unsafe { enif_send(ptr::null_mut(), recipient.as_c_arg(), *self.env, message) };
self.clear();
if res == 1 {
Ok(())
} else {
Err(SendError)
}
}
#[allow(clippy::arc_with_non_send_sync)] pub fn clear(&mut self) {
let c_env = *self.env;
self.env = Arc::new(c_env);
unsafe {
enif_clear_env(c_env);
}
}
pub fn save(&self, term: impl Encoder) -> SavedTerm {
SavedTerm {
term: self.run(|env| term.encode(env).as_c_arg()),
env_generation: Arc::downgrade(&self.env),
}
}
}
impl Drop for OwnedEnv {
fn drop(&mut self) {
unsafe {
enif_free_env(*self.env);
}
}
}
#[derive(Clone)]
pub struct SavedTerm {
env_generation: Weak<NIF_ENV>,
term: NIF_TERM,
}
unsafe impl Send for SavedTerm {}
impl SavedTerm {
pub fn load<'a>(&self, env: Env<'a>) -> Term<'a> {
match self.env_generation.upgrade() {
None => panic!("term is from a cleared or dropped OwnedEnv"),
Some(ref env_arc) if **env_arc == env.as_c_arg() => unsafe {
Term::new(env, self.term)
},
_ => panic!("can't load SavedTerm into a different environment"),
}
}
}
impl Default for OwnedEnv {
fn default() -> Self {
Self::new()
}
}