use std::cell::RefCell;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use std::future::Future;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::AtomicU64;
use std::sync::atomic::Ordering;
use std::time::Duration;
use flume::Receiver;
use flume::Sender;
use crate::ArgumentError;
use crate::AsyncCatchUnwind;
use crate::Dest;
use crate::Dests;
use crate::ExitReason;
use crate::Message;
use crate::Pid;
use crate::ProcessFlags;
use crate::ProcessInfo;
use crate::ProcessItem;
use crate::ProcessMonitor;
use crate::ProcessReceiver;
use crate::ProcessRegistration;
use crate::Receivable;
use crate::Reference;
use crate::Timeout;
use crate::alias_create;
use crate::alias_destroy;
use crate::alias_destroy_all;
use crate::link_create;
use crate::link_destroy;
use crate::link_fill_info;
use crate::link_install;
use crate::link_process_down;
use crate::monitor_create;
use crate::monitor_destroy;
use crate::monitor_destroy_all;
use crate::monitor_fill_info;
use crate::monitor_install;
use crate::monitor_process_down;
use crate::node_process_send_exit;
use crate::process_alive;
use crate::process_destroy_timer;
use crate::process_drop;
use crate::process_exit;
use crate::process_flags;
use crate::process_info;
use crate::process_insert;
use crate::process_list;
use crate::process_name_list;
use crate::process_name_lookup;
use crate::process_name_remove;
use crate::process_read_timer;
use crate::process_register;
use crate::process_register_timer;
use crate::process_send;
use crate::process_set_exit_reason;
use crate::process_set_flags;
use crate::process_unregister;
pub(crate) type ProcessSend = Sender<ProcessItem>;
pub(crate) type ProcessReceive = Receiver<ProcessItem>;
pub struct Process {
pub(crate) pid: Pid,
pub(crate) sender: ProcessSend,
pub(crate) receiver: ProcessReceive,
pub(crate) items: RefCell<Vec<ProcessItem>>,
pub(crate) aliases: RefCell<BTreeSet<u64>>,
pub(crate) monitors: RefCell<BTreeMap<Reference, ProcessMonitor>>,
}
tokio::task_local! {
pub(crate) static PROCESS: Process;
}
impl Process {
pub(crate) fn new(pid: Pid, sender: ProcessSend, receiver: ProcessReceive) -> Self {
Self {
pid,
sender,
receiver,
items: RefCell::new(Vec::new()),
aliases: RefCell::new(BTreeSet::new()),
monitors: RefCell::new(BTreeMap::new()),
}
}
pub fn alias(reply: bool) -> Reference {
let reference = Reference::new();
let sender = PROCESS.with(|process| {
process.aliases.borrow_mut().insert(reference.id());
process.sender.clone()
});
alias_create(sender, reference, reply);
reference
}
pub fn unalias(alias: Reference) -> bool {
PROCESS.with(|process| process.aliases.borrow_mut().remove(&alias.id()));
alias_destroy(alias)
}
#[must_use]
pub fn current() -> Pid {
PROCESS.with(|process| process.pid)
}
#[must_use]
pub fn whereis<S: AsRef<str>>(name: S) -> Option<Pid> {
process_name_lookup(name.as_ref())
}
pub fn send<D: Into<Dests>, M: Receivable>(dests: D, message: M) {
process_send(dests.into(), message);
}
pub fn send_after<D: Into<Dests>, M: Receivable>(
dest: D,
message: M,
duration: Duration,
) -> Reference {
let dest = dest.into();
let reference = Reference::new();
let handle = tokio::spawn(async move {
Process::sleep(duration).await;
Process::send(dest, message);
process_destroy_timer(reference);
});
process_register_timer(reference, duration, handle);
reference
}
pub fn cancel_timer(timer: Reference) {
process_destroy_timer(timer);
}
pub fn read_timer(timer: Reference) -> Option<Duration> {
process_read_timer(timer)
}
#[must_use]
pub fn receiver() -> ProcessReceiver<()> {
ProcessReceiver::new()
}
#[must_use]
pub async fn receive<T: Receivable>() -> Message<T> {
ProcessReceiver::new()
.strict_type_checking()
.receive()
.await
}
pub fn spawn<T>(function: T) -> Pid
where
T: Future<Output = ()> + Send + 'static,
T::Output: Send + 'static,
{
match spawn_internal(function, false, false) {
SpawnResult::Pid(pid) => pid,
SpawnResult::PidMonitor(_, _) => unreachable!(),
}
}
pub fn spawn_link<T>(function: T) -> Pid
where
T: Future<Output = ()> + Send + 'static,
T::Output: Send + 'static,
{
match spawn_internal(function, true, false) {
SpawnResult::Pid(pid) => pid,
SpawnResult::PidMonitor(_, _) => unreachable!(),
}
}
pub fn spawn_monitor<T>(function: T) -> (Pid, Reference)
where
T: Future<Output = ()> + Send + 'static,
T::Output: Send + 'static,
{
match spawn_internal(function, false, true) {
SpawnResult::Pid(_) => unreachable!(),
SpawnResult::PidMonitor(pid, monitor) => (pid, monitor),
}
}
#[must_use]
pub fn alive(pid: Pid) -> bool {
process_alive(pid)
}
pub async fn sleep(duration: Duration) {
tokio::time::sleep(duration).await
}
pub async fn timeout<F>(duration: Duration, future: F) -> Result<<F as Future>::Output, Timeout>
where
F: Future,
{
pingora_timeout::timeout(duration, future)
.await
.map_err(|_| Timeout)
}
pub fn register<S: Into<String>>(pid: Pid, name: S) -> Result<(), ArgumentError> {
process_register(pid, name.into())
}
pub fn unregister<S: AsRef<str>>(name: S) {
process_unregister(name.as_ref());
}
#[must_use]
pub fn registered() -> Vec<String> {
process_name_list()
}
#[must_use]
pub fn list() -> Vec<Pid> {
process_list()
}
pub fn link(pid: Pid) {
let current = Self::current();
if pid == current {
return;
}
link_install(pid, current);
}
pub fn unlink(pid: Pid) {
let current = Self::current();
if pid == current {
return;
}
link_destroy(pid, current);
}
pub fn monitor<T: Into<Dest>>(process: T) -> Reference {
let current = Self::current();
let process = process.into();
let reference = Reference::new();
monitor_install(process, reference, current);
reference
}
pub fn monitor_alias<T: Into<Dest>>(process: T, reply: bool) -> Reference {
let current = Self::current();
let process = process.into();
let sender = PROCESS.with(|process| process.sender.clone());
let reference = Reference::new();
alias_create(sender, reference, reply);
monitor_install(process, reference, current);
reference
}
pub fn demonitor(monitor: Reference) {
let Some(process_monitor) =
PROCESS.with(|process| process.monitors.borrow_mut().remove(&monitor))
else {
return;
};
let ProcessMonitor::ForProcess(pid) = process_monitor else {
panic!("Invalid process monitor reference!");
};
let Some(pid) = pid else {
return;
};
monitor_destroy(pid, monitor);
alias_destroy(monitor);
}
#[must_use]
pub fn flags() -> ProcessFlags {
process_flags(Self::current()).unwrap()
}
pub fn set_flags(flags: ProcessFlags) {
process_set_flags(Self::current(), flags)
}
pub fn exit<E: Into<ExitReason>>(pid: Pid, exit_reason: E) {
let exit_reason = exit_reason.into();
if pid.is_local() {
process_exit(pid, Self::current(), exit_reason);
} else {
node_process_send_exit(pid, Self::current(), exit_reason);
}
}
#[must_use]
pub fn info(pid: Pid) -> Option<ProcessInfo> {
if pid.is_remote() {
panic!("Can't query information on a remote process!");
}
let info = process_info(pid);
info.map(|mut info| {
link_fill_info(pid, &mut info);
monitor_fill_info(pid, &mut info);
info
})
}
}
impl Drop for Process {
fn drop(&mut self) {
let process = process_drop(self.pid).unwrap();
if let Some(name) = process.name {
process_name_remove(&name);
}
let exit_reason = process.exit_reason.unwrap_or_default();
link_process_down(self.pid, exit_reason.clone());
monitor_process_down(self.pid, exit_reason);
monitor_destroy_all(self.monitors.borrow().iter());
alias_destroy_all(self.aliases.borrow().iter());
}
}
enum SpawnResult {
Pid(Pid),
PidMonitor(Pid, Reference),
}
static ID: AtomicU64 = AtomicU64::new(1);
fn spawn_internal<T>(function: T, link: bool, monitor: bool) -> SpawnResult
where
T: Future<Output = ()> + Send + 'static,
T::Output: Send + 'static,
{
let next_id = ID.fetch_add(1, Ordering::Relaxed);
let (tx, rx) = flume::unbounded();
let pid = Pid::local(next_id);
let process = Process::new(pid, tx.clone(), rx);
let mut result = SpawnResult::Pid(pid);
if link {
let current = Process::current();
link_create(pid, current, true);
link_create(current, pid, true);
}
if monitor {
let monitor = Reference::new();
PROCESS.with(|process| {
process
.monitors
.borrow_mut()
.insert(monitor, ProcessMonitor::ForProcess(Some(pid)))
});
monitor_create(pid, monitor, Process::current(), Some(pid.into()));
result = SpawnResult::PidMonitor(pid, monitor);
}
let handle = tokio::spawn(PROCESS.scope(process, async move {
if let Err(e) = AsyncCatchUnwind::new(AssertUnwindSafe(function)).await {
process_set_exit_reason(Process::current(), e.into());
}
}));
process_insert(next_id, ProcessRegistration::new(handle, tx));
result
}