#[cfg(feature = "telemetry")]
use std::time::Instant;
#[cfg(feature = "telemetry")]
use metrics::{counter, gauge, histogram};
#[cfg(feature = "telemetry")]
use std::sync::atomic::{AtomicU32, Ordering};
#[cfg(feature = "telemetry")]
use std::sync::OnceLock;
#[cfg(feature = "telemetry")]
use tracing::{debug_span, info_span};
pub trait TelemetryProvider: Send + Sync {
fn on_actor_spawned(&self, actor_type: &str, pid: &str) {
let _ = (actor_type, pid);
}
fn on_actor_stopped(&self, actor_type: &str, pid: &str, reason: &str) {
let _ = (actor_type, pid, reason);
}
fn on_actor_panicked(&self, actor_type: &str, pid: &str, error: &str) {
let _ = (actor_type, pid, error);
}
fn on_message_sent(&self, from_pid: &str, to_pid: &str) {
let _ = (from_pid, to_pid);
}
fn on_message_received(&self, pid: &str) {
let _ = pid;
}
fn on_supervisor_restart(&self, child_type: &str, strategy: &str) {
let _ = (child_type, strategy);
}
}
#[derive(Debug, Clone, Copy)]
pub struct TelemetryConfig {
pub message_sampling_rate: u32,
pub signal_sampling_rate: u32,
pub queue_wait_sampling_rate: u32,
}
impl Default for TelemetryConfig {
fn default() -> Self {
Self {
message_sampling_rate: 100,
signal_sampling_rate: 100,
queue_wait_sampling_rate: 100,
}
}
}
#[cfg(feature = "telemetry")]
static TELEMETRY_CONFIG: OnceLock<TelemetryConfig> = OnceLock::new();
#[cfg(feature = "telemetry")]
static SAMPLE_COUNTER: AtomicU32 = AtomicU32::new(0);
#[cfg(feature = "telemetry")]
static CUSTOM_PROVIDER: OnceLock<Box<dyn TelemetryProvider>> = OnceLock::new();
pub struct TelemetrySpan {
#[cfg(feature = "telemetry")]
start: Instant,
#[cfg(feature = "telemetry")]
metric_name: &'static str,
#[cfg(feature = "telemetry")]
should_record: bool,
}
impl TelemetrySpan {
#[inline]
pub fn new(_metric_name: &'static str) -> Self {
Self {
#[cfg(feature = "telemetry")]
start: Instant::now(),
#[cfg(feature = "telemetry")]
metric_name: _metric_name,
#[cfg(feature = "telemetry")]
should_record: true,
}
}
#[inline]
pub fn new_sampled(_metric_name: &'static str, _should_record: bool) -> Self {
Self {
#[cfg(feature = "telemetry")]
start: Instant::now(),
#[cfg(feature = "telemetry")]
metric_name: _metric_name,
#[cfg(feature = "telemetry")]
should_record: _should_record,
}
}
#[cfg(feature = "telemetry")]
pub fn finish(self) {
if self.should_record {
let duration = self.start.elapsed();
histogram!(self.metric_name).record(duration.as_secs_f64());
}
}
#[cfg(not(feature = "telemetry"))]
pub fn finish(self) {}
}
#[cfg(feature = "telemetry")]
impl Drop for TelemetrySpan {
fn drop(&mut self) {
if self.should_record {
let duration = self.start.elapsed();
histogram!(self.metric_name).record(duration.as_secs_f64());
}
}
}
pub fn actor_type_name<T: ?Sized>() -> &'static str {
let full_name = std::any::type_name::<T>();
full_name.rsplit("::").next().unwrap_or(full_name)
}
pub struct ActorMetrics;
impl ActorMetrics {
#[inline]
pub fn actor_spawned_typed(actor_type: &str) {
#[cfg(feature = "telemetry")]
{
counter!("joerl_actors_spawned_total", "type" => actor_type.to_string()).increment(1);
gauge!("joerl_actors_active", "type" => actor_type.to_string()).increment(1.0);
}
}
#[inline]
pub fn actor_lifetime(actor_type: &str, lifetime_secs: f64) {
#[cfg(feature = "telemetry")]
{
histogram!("joerl_actor_lifetime_seconds", "type" => actor_type.to_string())
.record(lifetime_secs);
if lifetime_secs < 1.0 {
counter!("joerl_short_lived_actors_total", "type" => actor_type.to_string())
.increment(1);
}
}
}
#[inline]
pub fn actor_spawned() {
Self::actor_spawned_typed("unknown")
}
#[inline]
pub fn actor_stopped_typed(actor_type: &str, _reason: &str) {
#[cfg(feature = "telemetry")]
{
counter!("joerl_actors_stopped_total",
"type" => actor_type.to_string(),
"reason" => _reason.to_string()
)
.increment(1);
gauge!("joerl_actors_active", "type" => actor_type.to_string()).decrement(1.0);
}
}
#[inline]
pub fn actor_stopped(_reason: &str) {
Self::actor_stopped_typed("unknown", _reason)
}
#[inline]
pub fn actor_panicked_typed(actor_type: &str) {
#[cfg(feature = "telemetry")]
counter!("joerl_actors_panicked_total", "type" => actor_type.to_string()).increment(1);
}
#[inline]
pub fn actor_panicked() {
Self::actor_panicked_typed("unknown")
}
#[cfg(feature = "telemetry")]
#[inline]
pub fn actor_spawn_span(actor_type: &str, pid: &str) -> tracing::Span {
info_span!(
"actor.spawn",
actor.type = actor_type,
actor.pid = pid,
otel.kind = "internal"
)
}
#[cfg(not(feature = "telemetry"))]
#[inline]
pub fn actor_spawn_span(_actor_type: &str, _pid: &str) -> tracing::Span {
tracing::Span::none()
}
#[cfg(feature = "telemetry")]
#[inline]
pub fn actor_lifecycle_span(event: &str, actor_type: &str, pid: &str) -> tracing::Span {
info_span!(
"actor.lifecycle",
actor.event = event,
actor.type = actor_type,
actor.pid = pid
)
}
#[cfg(not(feature = "telemetry"))]
#[inline]
pub fn actor_lifecycle_span(_event: &str, _actor_type: &str, _pid: &str) -> tracing::Span {
tracing::Span::none()
}
}
pub struct MessageMetrics;
impl MessageMetrics {
#[inline]
pub fn message_sent() {
#[cfg(feature = "telemetry")]
counter!("joerl_messages_sent_total").increment(1);
}
#[inline]
pub fn message_send_failed(_reason: &str) {
#[cfg(feature = "telemetry")]
counter!("joerl_messages_sent_failed_total", "reason" => _reason.to_string()).increment(1);
}
#[inline]
pub fn message_processed() {
#[cfg(feature = "telemetry")]
{
let config = get_config();
if should_sample(config.message_sampling_rate) {
counter!("joerl_messages_processed_total").increment(1);
}
}
}
#[inline]
pub fn message_processing_span() -> TelemetrySpan {
#[cfg(feature = "telemetry")]
{
let config = get_config();
if should_sample(config.message_sampling_rate) {
return TelemetrySpan::new_sampled(
"joerl_message_processing_duration_seconds",
true,
);
}
}
TelemetrySpan::new_sampled("joerl_message_processing_duration_seconds", false)
}
#[inline]
pub fn message_queue_wait(wait_time_secs: f64) {
#[cfg(feature = "telemetry")]
{
let config = get_config();
if should_sample(config.queue_wait_sampling_rate) {
histogram!("joerl_message_queue_wait_seconds").record(wait_time_secs);
}
}
}
#[inline]
pub fn mailbox_depth_typed(actor_type: &str, _depth: usize, _capacity: usize) {
#[cfg(feature = "telemetry")]
{
gauge!("joerl_mailbox_depth", "type" => actor_type.to_string()).set(_depth as f64);
let utilization = if _capacity > 0 {
(_depth as f64 / _capacity as f64) * 100.0
} else {
0.0
};
gauge!("joerl_mailbox_utilization_percent", "type" => actor_type.to_string())
.set(utilization);
}
}
#[inline]
pub fn mailbox_depth(_depth: usize) {
#[cfg(feature = "telemetry")]
gauge!("joerl_mailbox_depth").set(_depth as f64);
}
#[inline]
pub fn mailbox_full_typed(actor_type: &str) {
#[cfg(feature = "telemetry")]
counter!("joerl_mailbox_full_total", "type" => actor_type.to_string()).increment(1);
}
#[inline]
pub fn mailbox_full() {
#[cfg(feature = "telemetry")]
counter!("joerl_mailbox_full_total").increment(1);
}
#[cfg(feature = "telemetry")]
#[inline]
pub fn message_send_span(from_pid: &str, to_pid: &str) -> tracing::Span {
debug_span!(
"message.send",
messaging.operation = "send",
messaging.system = "joerl",
actor.from_pid = from_pid,
actor.to_pid = to_pid,
otel.kind = "producer"
)
}
#[cfg(not(feature = "telemetry"))]
#[inline]
pub fn message_send_span(_from_pid: &str, _to_pid: &str) -> tracing::Span {
tracing::Span::none()
}
#[cfg(feature = "telemetry")]
#[inline]
pub fn message_receive_span(to_pid: &str, parent_span_id: Option<&str>) -> tracing::Span {
let span = debug_span!(
"message.receive",
messaging.operation = "receive",
messaging.system = "joerl",
actor.pid = to_pid,
parent.span.id = parent_span_id.unwrap_or("none"),
otel.kind = "consumer"
);
span
}
#[cfg(not(feature = "telemetry"))]
#[inline]
pub fn message_receive_span(_to_pid: &str, _parent_span_id: Option<&str>) -> tracing::Span {
tracing::Span::none()
}
}
pub struct LinkMetrics;
impl LinkMetrics {
#[inline]
pub fn link_created() {
#[cfg(feature = "telemetry")]
counter!("joerl_links_created_total").increment(1);
}
#[inline]
pub fn monitor_created() {
#[cfg(feature = "telemetry")]
counter!("joerl_monitors_created_total").increment(1);
}
}
pub struct SupervisorMetrics;
impl SupervisorMetrics {
#[inline]
pub fn child_restarted(_strategy: &str) {
#[cfg(feature = "telemetry")]
counter!("joerl_supervisor_restarts_total", "strategy" => _strategy.to_string())
.increment(1);
}
#[inline]
pub fn restart_intensity_exceeded() {
#[cfg(feature = "telemetry")]
counter!("joerl_supervisor_restart_intensity_exceeded_total").increment(1);
}
#[inline]
pub fn restart_span() -> TelemetrySpan {
TelemetrySpan::new("joerl_supervisor_restart_duration_seconds")
}
}
pub struct GenServerMetrics;
impl GenServerMetrics {
#[inline]
pub fn call_span(server_type: &str) -> GenServerCallSpan {
GenServerCallSpan {
#[cfg(feature = "telemetry")]
start: Instant::now(),
#[cfg(feature = "telemetry")]
server_type: server_type.to_string(),
}
}
#[inline]
pub fn cast(server_type: &str) {
#[cfg(feature = "telemetry")]
counter!("joerl_gen_server_casts_total", "type" => server_type.to_string()).increment(1);
}
#[inline]
pub fn call_timeout(server_type: &str) {
#[cfg(feature = "telemetry")]
counter!("joerl_gen_server_call_timeouts_total", "type" => server_type.to_string())
.increment(1);
}
#[inline]
pub fn calls_in_flight_inc(server_type: &str) {
#[cfg(feature = "telemetry")]
gauge!("joerl_gen_server_calls_in_flight", "type" => server_type.to_string())
.increment(1.0);
}
#[inline]
pub fn calls_in_flight_dec(server_type: &str) {
#[cfg(feature = "telemetry")]
gauge!("joerl_gen_server_calls_in_flight", "type" => server_type.to_string())
.decrement(1.0);
}
}
pub struct GenServerCallSpan {
#[cfg(feature = "telemetry")]
start: Instant,
#[cfg(feature = "telemetry")]
server_type: String,
}
impl GenServerCallSpan {
#[cfg(feature = "telemetry")]
pub fn finish(self) {
let duration = self.start.elapsed();
histogram!("joerl_gen_server_call_duration_seconds", "type" => self.server_type.clone())
.record(duration.as_secs_f64());
}
#[cfg(not(feature = "telemetry"))]
pub fn finish(self) {}
}
#[cfg(feature = "telemetry")]
impl Drop for GenServerCallSpan {
fn drop(&mut self) {
let duration = self.start.elapsed();
histogram!("joerl_gen_server_call_duration_seconds", "type" => self.server_type.clone())
.record(duration.as_secs_f64());
}
}
pub struct SignalMetrics;
impl SignalMetrics {
#[inline]
pub fn signal_sent(signal_type: &str) {
#[cfg(feature = "telemetry")]
{
let config = get_config();
if should_sample(config.signal_sampling_rate) {
counter!("joerl_signals_sent_total", "type" => signal_type.to_string())
.increment(1);
}
}
}
#[inline]
pub fn signal_received(signal_type: &str) {
#[cfg(feature = "telemetry")]
{
let config = get_config();
if should_sample(config.signal_sampling_rate) {
counter!("joerl_signals_received_total", "type" => signal_type.to_string())
.increment(1);
}
}
}
#[inline]
pub fn signal_ignored(signal_type: &str) {
#[cfg(feature = "telemetry")]
{
let config = get_config();
if should_sample(config.signal_sampling_rate) {
counter!("joerl_signals_ignored_total", "type" => signal_type.to_string())
.increment(1);
}
}
}
#[inline]
pub fn exit_signal_by_reason(reason: &str) {
#[cfg(feature = "telemetry")]
counter!("joerl_exit_signals_by_reason_total", "reason" => reason.to_string()).increment(1);
}
}
pub struct GenStatemMetrics;
impl GenStatemMetrics {
#[inline]
pub fn state_transition(fsm_type: &str, from_state: &str, to_state: &str) {
#[cfg(feature = "telemetry")]
counter!(
"joerl_gen_statem_transitions_total",
"type" => fsm_type.to_string(),
"from" => from_state.to_string(),
"to" => to_state.to_string()
)
.increment(1);
}
#[inline]
pub fn invalid_transition(fsm_type: &str, state: &str, event: &str) {
#[cfg(feature = "telemetry")]
counter!(
"joerl_gen_statem_invalid_transitions_total",
"type" => fsm_type.to_string(),
"state" => state.to_string(),
"event" => event.to_string()
)
.increment(1);
}
#[inline]
pub fn state_duration_span(fsm_type: &str, state: &str) -> GenStatemStateSpan {
GenStatemStateSpan {
#[cfg(feature = "telemetry")]
start: Instant::now(),
#[cfg(feature = "telemetry")]
fsm_type: fsm_type.to_string(),
#[cfg(feature = "telemetry")]
state: state.to_string(),
}
}
#[inline]
pub fn current_state(fsm_type: &str, state: &str) {
#[cfg(feature = "telemetry")]
{
gauge!("joerl_gen_statem_current_state",
"type" => fsm_type.to_string(),
"state" => state.to_string()
)
.set(1.0);
}
}
}
pub struct GenStatemStateSpan {
#[cfg(feature = "telemetry")]
start: Instant,
#[cfg(feature = "telemetry")]
fsm_type: String,
#[cfg(feature = "telemetry")]
state: String,
}
impl GenStatemStateSpan {
#[cfg(feature = "telemetry")]
pub fn finish(self) {
let duration = self.start.elapsed();
histogram!(
"joerl_gen_statem_state_duration_seconds",
"type" => self.fsm_type.clone(),
"state" => self.state.clone()
)
.record(duration.as_secs_f64());
}
#[cfg(not(feature = "telemetry"))]
pub fn finish(self) {}
}
#[cfg(feature = "telemetry")]
impl Drop for GenStatemStateSpan {
fn drop(&mut self) {
let duration = self.start.elapsed();
histogram!(
"joerl_gen_statem_state_duration_seconds",
"type" => self.fsm_type.clone(),
"state" => self.state.clone()
)
.record(duration.as_secs_f64());
}
}
pub fn set_config(config: TelemetryConfig) {
#[cfg(feature = "telemetry")]
{
if TELEMETRY_CONFIG.set(config).is_err() {
tracing::warn!("Telemetry config already set, ignoring new config");
} else {
tracing::info!(
"Telemetry config set: messages={}%, signals={}%, queue_wait={}%",
config.message_sampling_rate,
config.signal_sampling_rate,
config.queue_wait_sampling_rate
);
}
}
}
pub fn get_config() -> TelemetryConfig {
#[cfg(feature = "telemetry")]
{
*TELEMETRY_CONFIG.get_or_init(TelemetryConfig::default)
}
#[cfg(not(feature = "telemetry"))]
TelemetryConfig::default()
}
#[cfg(feature = "telemetry")]
#[inline]
fn should_sample(rate: u32) -> bool {
if rate >= 100 {
return true;
}
if rate == 0 {
return false;
}
let counter = SAMPLE_COUNTER.fetch_add(1, Ordering::Relaxed);
(counter % 100) < rate
}
pub fn set_telemetry_provider(_provider: Box<dyn TelemetryProvider>) {
#[cfg(feature = "telemetry")]
{
if CUSTOM_PROVIDER.set(_provider).is_err() {
tracing::warn!("Telemetry provider already set, ignoring new provider");
} else {
tracing::info!("Custom telemetry provider registered");
}
}
}
#[cfg(feature = "telemetry")]
#[inline]
#[allow(dead_code)] pub(crate) fn invoke_provider_actor_spawned(actor_type: &str, pid: &str) {
if let Some(provider) = CUSTOM_PROVIDER.get() {
provider.on_actor_spawned(actor_type, pid);
}
}
#[cfg(feature = "telemetry")]
#[inline]
#[allow(dead_code)] pub(crate) fn invoke_provider_actor_stopped(actor_type: &str, pid: &str, reason: &str) {
if let Some(provider) = CUSTOM_PROVIDER.get() {
provider.on_actor_stopped(actor_type, pid, reason);
}
}
#[cfg(feature = "telemetry")]
#[inline]
#[allow(dead_code)] pub(crate) fn invoke_provider_actor_panicked(actor_type: &str, pid: &str, error: &str) {
if let Some(provider) = CUSTOM_PROVIDER.get() {
provider.on_actor_panicked(actor_type, pid, error);
}
}
#[cfg(feature = "telemetry")]
#[inline]
#[allow(dead_code)] pub(crate) fn invoke_provider_message_sent(from_pid: &str, to_pid: &str) {
if let Some(provider) = CUSTOM_PROVIDER.get() {
provider.on_message_sent(from_pid, to_pid);
}
}
#[cfg(feature = "telemetry")]
#[inline]
#[allow(dead_code)] pub(crate) fn invoke_provider_message_received(pid: &str) {
if let Some(provider) = CUSTOM_PROVIDER.get() {
provider.on_message_received(pid);
}
}
#[cfg(feature = "telemetry")]
#[inline]
#[allow(dead_code)] pub(crate) fn invoke_provider_supervisor_restart(child_type: &str, strategy: &str) {
if let Some(provider) = CUSTOM_PROVIDER.get() {
provider.on_supervisor_restart(child_type, strategy);
}
}
#[cfg(not(feature = "telemetry"))]
#[inline]
pub(crate) fn invoke_provider_actor_spawned(_actor_type: &str, _pid: &str) {}
#[cfg(not(feature = "telemetry"))]
#[inline]
pub(crate) fn invoke_provider_actor_stopped(_actor_type: &str, _pid: &str, _reason: &str) {}
#[cfg(not(feature = "telemetry"))]
#[inline]
pub(crate) fn invoke_provider_actor_panicked(_actor_type: &str, _pid: &str, _error: &str) {}
#[cfg(not(feature = "telemetry"))]
#[inline]
pub(crate) fn invoke_provider_message_sent(_from_pid: &str, _to_pid: &str) {}
#[cfg(not(feature = "telemetry"))]
#[inline]
pub(crate) fn invoke_provider_message_received(_pid: &str) {}
#[cfg(not(feature = "telemetry"))]
#[inline]
pub(crate) fn invoke_provider_supervisor_restart(_child_type: &str, _strategy: &str) {}
pub struct MemoryMetrics;
impl MemoryMetrics {
#[inline]
pub fn update_system_memory() {
#[cfg(all(feature = "telemetry", target_os = "linux"))]
{
if let Ok(memory_kb) = Self::get_process_memory_linux() {
gauge!("joerl_system_memory_bytes").set(memory_kb as f64 * 1024.0);
}
}
}
#[cfg(all(feature = "telemetry", target_os = "linux"))]
fn get_process_memory_linux() -> Result<usize, std::io::Error> {
use std::fs;
let status = fs::read_to_string("/proc/self/status")?;
for line in status.lines() {
if line.starts_with("VmRSS:") {
if let Some(value) = line.split_whitespace().nth(1) {
return value
.parse::<usize>()
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e));
}
}
}
Err(std::io::Error::new(
std::io::ErrorKind::NotFound,
"VmRSS not found",
))
}
#[inline]
pub fn mailbox_memory_typed(actor_type: &str, _depth: usize, _avg_message_size: usize) {
#[cfg(feature = "telemetry")]
{
let estimated_bytes = _depth * _avg_message_size;
gauge!("joerl_mailbox_memory_bytes", "type" => actor_type.to_string())
.set(estimated_bytes as f64);
}
}
#[inline]
pub fn total_mailbox_memory(_total_bytes: usize) {
#[cfg(feature = "telemetry")]
gauge!("joerl_mailbox_memory_total_bytes").set(_total_bytes as f64);
}
}
pub struct DistributedMetrics;
impl DistributedMetrics {
#[inline]
pub fn remote_message_sent(target_node: &str) {
#[cfg(feature = "telemetry")]
counter!("joerl_remote_messages_sent_total", "target_node" => target_node.to_string())
.increment(1);
}
#[inline]
pub fn remote_message_failed(target_node: &str, reason: &str) {
#[cfg(feature = "telemetry")]
counter!(
"joerl_remote_messages_failed_total",
"target_node" => target_node.to_string(),
"reason" => reason.to_string()
)
.increment(1);
}
#[inline]
pub fn active_connections(count: usize) {
#[cfg(feature = "telemetry")]
gauge!("joerl_node_connections_active").set(count as f64);
}
#[inline]
pub fn connection_established(node_name: &str) {
#[cfg(feature = "telemetry")]
counter!("joerl_node_connection_established_total", "node" => node_name.to_string())
.increment(1);
}
#[inline]
pub fn connection_lost(node_name: &str) {
#[cfg(feature = "telemetry")]
counter!("joerl_node_connection_lost_total", "node" => node_name.to_string()).increment(1);
}
#[inline]
pub fn serialization_error() {
#[cfg(feature = "telemetry")]
counter!("joerl_serialization_errors_total").increment(1);
}
#[inline]
pub fn network_latency(node_name: &str, duration_secs: f64) {
#[cfg(feature = "telemetry")]
histogram!("joerl_network_latency_seconds", "node" => node_name.to_string())
.record(duration_secs);
}
}
pub fn init() {
#[cfg(feature = "telemetry")]
{
let _ = TELEMETRY_CONFIG.get_or_init(TelemetryConfig::default);
tracing::info!("joerl telemetry initialized");
}
}
#[cfg(feature = "telemetry")]
pub fn init_prometheus(addr: &str) -> Result<(), Box<dyn std::error::Error>> {
use std::net::SocketAddr;
let addr: SocketAddr = addr.parse()?;
tracing::info!("Prometheus metrics exporter starting on {}", addr);
Ok(())
}
#[cfg(not(feature = "telemetry"))]
pub fn init_prometheus(_addr: &str) -> Result<(), Box<dyn std::error::Error>> {
Ok(())
}
pub struct RegistryMetrics;
impl RegistryMetrics {
#[inline]
pub fn process_registered() {
#[cfg(feature = "telemetry")]
{
counter!("joerl_registry_registrations_total").increment(1);
gauge!("joerl_registry_size").increment(1.0);
}
}
#[inline]
pub fn process_unregistered() {
#[cfg(feature = "telemetry")]
{
counter!("joerl_registry_unregistrations_total").increment(1);
gauge!("joerl_registry_size").decrement(1.0);
}
}
#[inline]
pub fn lookup_performed() {
#[cfg(feature = "telemetry")]
counter!("joerl_registry_lookups_total").increment(1);
}
#[inline]
pub fn registration_conflict() {
#[cfg(feature = "telemetry")]
counter!("joerl_registry_conflicts_total").increment(1);
}
}
pub struct SchedulerMetrics;
impl SchedulerMetrics {
#[inline]
pub fn message_scheduled() {
#[cfg(feature = "telemetry")]
{
counter!("joerl_scheduled_messages_total").increment(1);
gauge!("joerl_scheduled_messages_active").increment(1.0);
}
}
#[inline]
pub fn timer_cancelled() {
#[cfg(feature = "telemetry")]
{
counter!("joerl_scheduled_messages_cancelled_total").increment(1);
gauge!("joerl_scheduled_messages_active").decrement(1.0);
}
}
#[inline]
pub fn message_delivered() {
#[cfg(feature = "telemetry")]
{
counter!("joerl_scheduled_messages_delivered_total").increment(1);
gauge!("joerl_scheduled_messages_active").decrement(1.0);
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_telemetry_span_creation() {
let _span = TelemetrySpan::new("test_metric");
}
#[test]
fn test_metrics_no_panic() {
ActorMetrics::actor_spawned();
ActorMetrics::actor_stopped("normal");
ActorMetrics::actor_panicked();
MessageMetrics::message_sent();
MessageMetrics::message_send_failed("mailbox_full");
MessageMetrics::message_processed();
MessageMetrics::mailbox_depth(10);
MessageMetrics::mailbox_full();
LinkMetrics::link_created();
LinkMetrics::monitor_created();
SupervisorMetrics::child_restarted("one_for_one");
SupervisorMetrics::restart_intensity_exceeded();
}
#[test]
fn test_span_finish() {
let span = MessageMetrics::message_processing_span();
span.finish();
}
#[test]
fn test_init() {
init();
}
#[test]
fn test_distributed_metrics_no_panic() {
DistributedMetrics::remote_message_sent("node_a");
DistributedMetrics::remote_message_failed("node_b", "connection_failed");
DistributedMetrics::active_connections(5);
DistributedMetrics::connection_established("node_c");
DistributedMetrics::connection_lost("node_d");
DistributedMetrics::serialization_error();
DistributedMetrics::network_latency("node_e", 0.05);
}
}