use opentelemetry::{
global,
metrics::{Counter, Histogram, Meter, UpDownCounter},
KeyValue,
};
use std::time::Instant;
#[derive(Clone)]
pub struct Metrics {
#[allow(dead_code)]
meter: Meter,
pub ws_connections_total: Counter<u64>,
pub ws_connections_active: UpDownCounter<i64>,
pub ws_messages_received: Counter<u64>,
pub ws_messages_sent: Counter<u64>,
pub ws_connection_duration: Histogram<f64>,
pub ws_subscriptions_active: UpDownCounter<i64>,
pub projector_mutations_processed: Counter<u64>,
pub projector_frames_published: Counter<u64>,
pub projector_processing_latency: Histogram<f64>,
pub stream_events_received: Counter<u64>,
pub stream_errors_total: Counter<u64>,
pub vm_instructions_executed: Counter<u64>,
pub vm_events_processed: Counter<u64>,
pub vm_event_processing_duration: Histogram<f64>,
pub vm_mutations_emitted: Counter<u64>,
pub vm_pda_cache_hits: Counter<u64>,
pub vm_pda_cache_misses: Counter<u64>,
pub vm_pending_queue_size: UpDownCounter<i64>,
pub entities_active: UpDownCounter<i64>,
}
impl Metrics {
pub fn new(service_name: impl Into<std::borrow::Cow<'static, str>>) -> Self {
let meter = global::meter(service_name);
let ws_connections_total = meter
.u64_counter("hyperstack.ws.connections.total")
.with_description("Total number of WebSocket connections")
.init();
let ws_connections_active = meter
.i64_up_down_counter("hyperstack.ws.connections.active")
.with_description("Number of active WebSocket connections")
.init();
let ws_messages_received = meter
.u64_counter("hyperstack.ws.messages.received")
.with_description("Total WebSocket messages received from clients")
.init();
let ws_messages_sent = meter
.u64_counter("hyperstack.ws.messages.sent")
.with_description("Total WebSocket messages sent to clients")
.init();
let ws_connection_duration = meter
.f64_histogram("hyperstack.ws.connection.duration")
.with_description("Duration of WebSocket connections in seconds")
.init();
let ws_subscriptions_active = meter
.i64_up_down_counter("hyperstack.ws.subscriptions.active")
.with_description("Number of active subscriptions by view")
.init();
let projector_mutations_processed = meter
.u64_counter("hyperstack.projector.mutations.processed")
.with_description("Total mutations processed by the projector")
.init();
let projector_frames_published = meter
.u64_counter("hyperstack.projector.frames.published")
.with_description("Total frames published by mode")
.init();
let projector_processing_latency = meter
.f64_histogram("hyperstack.projector.latency")
.with_description("Latency of mutation processing in milliseconds")
.init();
let stream_events_received = meter
.u64_counter("hyperstack.stream.events.received")
.with_description("Total events received from the stream")
.init();
let stream_errors_total = meter
.u64_counter("hyperstack.stream.errors.total")
.with_description("Total stream errors")
.init();
let vm_instructions_executed = meter
.u64_counter("hyperstack.vm.instructions.executed")
.with_description("Total VM instructions executed")
.init();
let vm_events_processed = meter
.u64_counter("hyperstack.vm.events.processed")
.with_description("Total events processed by the VM")
.init();
let vm_event_processing_duration = meter
.f64_histogram("hyperstack.vm.event.duration")
.with_description("Duration of event processing in the VM in milliseconds")
.init();
let vm_mutations_emitted = meter
.u64_counter("hyperstack.vm.mutations.emitted")
.with_description("Total mutations emitted by the VM")
.init();
let vm_pda_cache_hits = meter
.u64_counter("hyperstack.vm.pda_cache.hits")
.with_description("PDA reverse lookup cache hits")
.init();
let vm_pda_cache_misses = meter
.u64_counter("hyperstack.vm.pda_cache.misses")
.with_description("PDA reverse lookup cache misses")
.init();
let vm_pending_queue_size = meter
.i64_up_down_counter("hyperstack.vm.pending_queue.size")
.with_description("Size of the pending account updates queue")
.init();
let entities_active = meter
.i64_up_down_counter("hyperstack.entities.active")
.with_description("Number of active entities being tracked")
.init();
Self {
meter,
ws_connections_total,
ws_connections_active,
ws_messages_received,
ws_messages_sent,
ws_connection_duration,
ws_subscriptions_active,
projector_mutations_processed,
projector_frames_published,
projector_processing_latency,
stream_events_received,
stream_errors_total,
vm_instructions_executed,
vm_events_processed,
vm_event_processing_duration,
vm_mutations_emitted,
vm_pda_cache_hits,
vm_pda_cache_misses,
vm_pending_queue_size,
entities_active,
}
}
pub fn record_ws_connection(&self) {
self.ws_connections_total.add(1, &[]);
self.ws_connections_active.add(1, &[]);
}
pub fn record_ws_disconnection(&self, duration_secs: f64) {
self.ws_connections_active.add(-1, &[]);
self.ws_connection_duration.record(duration_secs, &[]);
}
pub fn record_ws_message_received(&self) {
self.ws_messages_received.add(1, &[]);
}
pub fn record_ws_message_sent(&self) {
self.ws_messages_sent.add(1, &[]);
}
pub fn record_subscription_created(&self, view_id: &str) {
self.ws_subscriptions_active
.add(1, &[KeyValue::new("view_id", view_id.to_string())]);
}
pub fn record_subscription_removed(&self, view_id: &str) {
self.ws_subscriptions_active
.add(-1, &[KeyValue::new("view_id", view_id.to_string())]);
}
pub fn record_mutation_processed(&self, entity: &str) {
self.projector_mutations_processed
.add(1, &[KeyValue::new("entity", entity.to_string())]);
}
pub fn record_frame_published(&self, mode: &str, entity: &str) {
self.projector_frames_published.add(
1,
&[
KeyValue::new("mode", mode.to_string()),
KeyValue::new("entity", entity.to_string()),
],
);
}
pub fn record_projector_latency(&self, latency_ms: f64) {
self.projector_processing_latency.record(latency_ms, &[]);
}
pub fn record_stream_event(&self, event_type: &str) {
self.stream_events_received
.add(1, &[KeyValue::new("event_type", event_type.to_string())]);
}
pub fn record_stream_error(&self, error_type: &str) {
self.stream_errors_total
.add(1, &[KeyValue::new("error_type", error_type.to_string())]);
}
pub fn record_vm_instructions(&self, count: u64) {
self.vm_instructions_executed.add(count, &[]);
}
pub fn record_vm_event(&self, event_type: &str, program_id: &str) {
self.vm_events_processed.add(
1,
&[
KeyValue::new("event_type", event_type.to_string()),
KeyValue::new("program_id", program_id.to_string()),
],
);
}
pub fn record_vm_event_duration(&self, duration_ms: f64, event_type: &str) {
self.vm_event_processing_duration.record(
duration_ms,
&[KeyValue::new("event_type", event_type.to_string())],
);
}
pub fn record_vm_mutations(&self, count: u64, entity: &str) {
self.vm_mutations_emitted
.add(count, &[KeyValue::new("entity", entity.to_string())]);
}
pub fn record_pda_cache_hit(&self) {
self.vm_pda_cache_hits.add(1, &[]);
}
pub fn record_pda_cache_miss(&self) {
self.vm_pda_cache_misses.add(1, &[]);
}
pub fn update_pending_queue_size(&self, delta: i64) {
self.vm_pending_queue_size.add(delta, &[]);
}
pub fn record_entity_active(&self, entity_name: &str) {
self.entities_active
.add(1, &[KeyValue::new("entity", entity_name.to_string())]);
}
pub fn record_entity_inactive(&self, entity_name: &str) {
self.entities_active
.add(-1, &[KeyValue::new("entity", entity_name.to_string())]);
}
}
pub struct MetricsTimer {
start: Instant,
histogram: Histogram<f64>,
attributes: Vec<KeyValue>,
}
impl MetricsTimer {
pub fn new(histogram: Histogram<f64>, attributes: Vec<KeyValue>) -> Self {
Self {
start: Instant::now(),
histogram,
attributes,
}
}
pub fn stop(self) -> f64 {
let duration_ms = self.start.elapsed().as_secs_f64() * 1000.0;
self.histogram.record(duration_ms, &self.attributes);
duration_ms
}
}
impl Drop for MetricsTimer {
fn drop(&mut self) {
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_metrics_creation() {
let _metrics = Metrics::new("test_service");
}
}