pub mod config;
pub mod env;
pub mod mailbox;
pub mod message;
pub mod runtimes;
pub mod state;
pub mod wasm;
use std::{collections::HashMap, fmt::Debug, future::Future, hash::Hash, sync::Arc};
use anyhow::{anyhow, Result};
use env::Environment;
use log::{debug, log_enabled, trace, warn, Level};
use smallvec::SmallVec;
use state::ProcessState;
use tokio::{
sync::{
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
Mutex,
},
task::JoinHandle,
};
use crate::{mailbox::MessageMailbox, message::Message};
#[cfg(feature = "metrics")]
pub fn describe_metrics() {
use metrics::{describe_counter, describe_gauge, describe_histogram, Unit};
describe_counter!(
"lunatic.process.signals.send",
Unit::Count,
"Number of signals sent to processes since startup"
);
describe_counter!(
"lunatic.process.signals.received",
Unit::Count,
"Number of signals received by processes since startup"
);
describe_counter!(
"lunatic.process.messages.send",
Unit::Count,
"Number of messages sent to processes since startup"
);
describe_gauge!(
"lunatic.process.messages.outstanding",
Unit::Count,
"Current number of messages that are ready to be consumed by the process"
);
describe_gauge!(
"lunatic.process.links.alive",
Unit::Count,
"Number of links currently alive"
);
describe_counter!(
"lunatic.process.messages.data.count",
Unit::Count,
"Number of data messages send since startup"
);
describe_histogram!(
"lunatic.process.messages.data.resources.count",
Unit::Count,
"Number of resources used by each individual data message"
);
describe_histogram!(
"lunatic.process.messages.data.size",
Unit::Bytes,
"Number of bytes used by each individual data message"
);
describe_counter!(
"lunatic.process.messages.link_died.count",
Unit::Count,
"Number of LinkDied messages send since startup"
);
describe_gauge!(
"lunatic.process.environment.process.count",
Unit::Count,
"Number of currently registered processes"
);
describe_gauge!(
"lunatic.process.environment.count",
Unit::Count,
"Number of currently active environments"
);
}
pub trait Process: Send + Sync {
fn id(&self) -> u64;
fn send(&self, signal: Signal);
}
impl Debug for dyn Process {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Point").field("id", &self.id()).finish()
}
}
impl Hash for dyn Process {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id().hash(state);
}
}
pub enum Signal {
Message(Message),
Kill,
DieWhenLinkDies(bool),
Link(Option<i64>, Arc<dyn Process>),
UnLink { process_id: u64 },
LinkDied(u64, Option<i64>, DeathReason),
Monitor(Arc<dyn Process>),
StopMonitoring { process_id: u64 },
ProcessDied(u64),
}
impl Debug for Signal {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Message(_) => write!(f, "Message"),
Self::Kill => write!(f, "Kill"),
Self::DieWhenLinkDies(_) => write!(f, "DieWhenLinkDies"),
Self::Link(_, p) => write!(f, "Link {}", p.id()),
Self::UnLink { process_id } => write!(f, "UnLink {process_id}"),
Self::LinkDied(_, _, reason) => write!(f, "LinkDied {reason:?}"),
Self::Monitor(p) => write!(f, "Monitor {}", p.id()),
Self::StopMonitoring { process_id } => write!(f, "UnMonitor {process_id}"),
Self::ProcessDied(_) => write!(f, "ProcessDied"),
}
}
}
#[derive(Clone, Copy, Debug)]
pub enum DeathReason {
Normal,
Failure,
NoProcess,
}
pub enum Finished<T> {
Normal(T),
KillSignal,
}
#[derive(Debug, Clone)]
pub struct WasmProcess {
id: u64,
signal_mailbox: UnboundedSender<Signal>,
}
impl WasmProcess {
pub fn new(id: u64, signal_mailbox: UnboundedSender<Signal>) -> Self {
Self { id, signal_mailbox }
}
}
impl Process for WasmProcess {
fn id(&self) -> u64 {
self.id
}
fn send(&self, signal: Signal) {
#[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
let labels = [("process_kind", "wasm")];
#[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
let labels = [
("process_kind", "wasm"),
("process_id", self.id().to_string()),
];
#[cfg(feature = "metrics")]
metrics::increment_counter!("lunatic.process.signals.send", &labels);
let _ = self.signal_mailbox.send(signal);
}
}
enum NameOrID<'a> {
Names(SmallVec<[&'a str; 2]>),
ID(u64),
}
impl<'a> NameOrID<'a> {
fn or_id(self, id: u64) -> Self {
match self {
NameOrID::Names(ref names) if !names.is_empty() => self,
_ => NameOrID::ID(id),
}
}
}
impl<'a> std::fmt::Display for NameOrID<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NameOrID::Names(names) => {
for (i, name) in names.iter().enumerate() {
if i > 0 {
write!(f, " / ")?;
}
write!(f, "'{name}'")?;
}
Ok(())
}
NameOrID::ID(id) => write!(f, "{id}"),
}
}
}
impl<'a> FromIterator<&'a str> for NameOrID<'a> {
fn from_iter<T: IntoIterator<Item = &'a str>>(iter: T) -> Self {
let names = SmallVec::from_iter(iter);
NameOrID::Names(names)
}
}
pub(crate) async fn new<F, S, R>(
fut: F,
id: u64,
env: Arc<dyn Environment>,
signal_mailbox: Arc<Mutex<UnboundedReceiver<Signal>>>,
message_mailbox: MessageMailbox,
) -> Result<S>
where
S: ProcessState,
R: Into<ExecutionResult<S>>,
F: Future<Output = R> + Send + 'static,
{
trace!("Process {} spawned", id);
tokio::pin!(fut);
let mut die_when_link_dies = true;
let mut links = HashMap::new();
let mut monitors = HashMap::new();
let mut signal_mailbox = signal_mailbox.lock().await;
let mut has_sender = true;
#[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
let labels: [(String, String); 0] = [];
#[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
let labels = [("process_id", id.to_string())];
let result = loop {
tokio::select! {
biased;
signal = signal_mailbox.recv(), if has_sender => {
#[cfg(feature = "metrics")]
metrics::increment_counter!("lunatic.process.signals.received", &labels);
match signal.ok_or(()) {
Ok(Signal::Message(message)) => {
#[cfg(feature = "metrics")]
message.write_metrics();
message_mailbox.push(message);
#[cfg(feature = "metrics")]
metrics::increment_counter!("lunatic.process.messages.send", &labels);
#[cfg(feature = "metrics")]
metrics::gauge!("lunatic.process.messages.outstanding", message_mailbox.len() as f64, &labels);
},
Ok(Signal::DieWhenLinkDies(value)) => die_when_link_dies = value,
Ok(Signal::Link(tag, proc)) => {
links.insert(proc.id(), (proc, tag));
#[cfg(feature = "metrics")]
metrics::gauge!("lunatic.process.links.alive", links.len() as f64, &labels);
},
Ok(Signal::UnLink { process_id }) => {
links.remove(&process_id);
#[cfg(feature = "metrics")]
metrics::gauge!("lunatic.process.links.alive", links.len() as f64, &labels);
}
Ok(Signal::Kill) => break Finished::KillSignal,
Ok(Signal::LinkDied(id, tag, reason)) => {
links.remove(&id);
#[cfg(feature = "metrics")]
metrics::gauge!("lunatic.process.links.alive", links.len() as f64, &labels);
match reason {
DeathReason::Failure | DeathReason::NoProcess => {
if die_when_link_dies {
break Finished::KillSignal
} else {
let message = Message::LinkDied(tag);
#[cfg(feature = "metrics")]
metrics::increment_counter!("lunatic.process.messages.send", &labels);
#[cfg(feature = "metrics")]
metrics::gauge!("lunatic.process.messages.outstanding", message_mailbox.len() as f64, &labels);
message_mailbox.push(message);
}
},
DeathReason::Normal => {},
}
},
Ok(Signal::Monitor(proc)) => {
monitors.insert(proc.id(), proc);
}
Ok(Signal::StopMonitoring { process_id }) => {
monitors.remove(&process_id);
}
Ok(Signal::ProcessDied(id)) => {
message_mailbox.push(Message::ProcessDied(id));
}
Err(_) => {
debug_assert!(has_sender);
has_sender = false;
}
}
}
output = &mut fut => { break Finished::Normal(output); }
}
};
env.remove_process(id);
let result = match result {
Finished::Normal(result) => {
let result: ExecutionResult<_> = result.into();
if let Some(failure) = result.failure() {
let registry = result.state().registry().read().await;
let name = registry
.iter()
.filter(|(_, (_, process_id))| process_id == &id)
.map(|(name, _)| name.splitn(4, '/').last().unwrap_or(name.as_str()))
.collect::<NameOrID>()
.or_id(id);
warn!(
"Process {} failed, notifying: {} links {}",
name,
links.len(),
if !log_enabled!(Level::Debug) {
"\n\t\t\t (Set ENV variable `RUST_LOG=lunatic=debug` to show stacktrace)"
} else {
""
}
);
debug!("{}", failure);
Err(anyhow!(failure.to_string()))
} else {
Ok(result.into_state())
}
}
Finished::KillSignal => {
warn!(
"Process {} was killed, notifying: {} links",
id,
links.len()
);
Err(anyhow!("Process received Kill signal"))
}
};
let reason = match result {
Ok(_) => DeathReason::Normal,
Err(_) => DeathReason::Failure,
};
for (proc, tag) in links.values() {
proc.send(Signal::LinkDied(id, *tag, reason));
}
for proc in monitors.values() {
proc.send(Signal::ProcessDied(id));
}
result
}
#[derive(Clone, Debug)]
pub struct NativeProcess {
id: u64,
signal_mailbox: UnboundedSender<Signal>,
}
pub fn spawn<T, F, K, R>(
env: Arc<dyn Environment>,
func: F,
) -> (JoinHandle<Result<T>>, NativeProcess)
where
T: ProcessState + Send + Sync + 'static,
R: Into<ExecutionResult<T>> + Send + 'static,
K: Future<Output = R> + Send + 'static,
F: FnOnce(NativeProcess, MessageMailbox) -> K,
{
let id = env.get_next_process_id();
let (signal_sender, signal_mailbox) = unbounded_channel::<Signal>();
let message_mailbox = MessageMailbox::default();
let process = NativeProcess {
id,
signal_mailbox: signal_sender,
};
let fut = func(process.clone(), message_mailbox.clone());
let signal_mailbox = Arc::new(Mutex::new(signal_mailbox));
let join = tokio::task::spawn(new(fut, id, env.clone(), signal_mailbox, message_mailbox));
(join, process)
}
impl Process for NativeProcess {
fn id(&self) -> u64 {
self.id
}
fn send(&self, signal: Signal) {
#[cfg(all(feature = "metrics", not(feature = "detailed_metrics")))]
let labels = [("process_kind", "native")];
#[cfg(all(feature = "metrics", feature = "detailed_metrics"))]
let labels = [
("process_kind", "native"),
("process_id", self.id().to_string()),
];
#[cfg(feature = "metrics")]
metrics::increment_counter!("lunatic.process.signals.send", &labels);
let _ = self.signal_mailbox.send(signal);
}
}
pub struct ExecutionResult<T> {
state: T,
result: ResultValue,
}
impl<T> ExecutionResult<T> {
pub fn failure(&self) -> Option<&str> {
match self.result {
ResultValue::Failed(ref failure) => Some(failure),
ResultValue::SpawnError(ref failure) => Some(failure),
_ => None,
}
}
pub fn state(&self) -> &T {
&self.state
}
pub fn into_state(self) -> T {
self.state
}
}
impl<T> From<Result<T>> for ExecutionResult<T>
where
T: Default,
{
fn from(result: Result<T>) -> Self {
match result {
Ok(t) => ExecutionResult {
state: t,
result: ResultValue::Ok,
},
Err(e) => ExecutionResult {
state: T::default(),
result: ResultValue::Failed(e.to_string()),
},
}
}
}
pub enum ResultValue {
Ok,
Failed(String),
SpawnError(String),
}