near_async/instrumentation/
queue.rs

1use core::str;
2use std::collections::HashMap;
3use std::sync::Arc;
4use std::sync::atomic::AtomicU64;
5
6use parking_lot::RwLock;
7
8use crate::instrumentation::metrics::QUEUE_PENDING_MESSAGES;
9use near_o11y::metrics::prometheus;
10
11/// InstrumentedQueue keeps track of the number of pending messages of each type in the queue.
12pub struct InstrumentedQueue {
13    pending: RwLock<HashMap<String, AtomicU64>>,
14    pending_messages_gauge: prometheus::core::GenericGauge<prometheus::core::AtomicI64>,
15}
16
17impl InstrumentedQueue {
18    pub fn new(queue_name: &str) -> Arc<Self> {
19        let queue = Arc::new(Self {
20            pending: RwLock::new(HashMap::new()),
21            pending_messages_gauge: QUEUE_PENDING_MESSAGES.with_label_values(&[queue_name]),
22        });
23        queue
24    }
25
26    pub fn enqueue(&self, message_type: &str) {
27        {
28            // Try just with read lock first, to avoid write lock in the common case.
29            let pending = self.pending.read();
30            if let Some(counter) = pending.get(message_type) {
31                counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
32                self.pending_messages_gauge.inc();
33                return;
34            }
35        }
36        // Use write lock if the message type is not found.
37        let mut pending = self.pending.write();
38        let counter = pending.entry(message_type.to_string()).or_insert_with(|| AtomicU64::new(0));
39        counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
40        self.pending_messages_gauge.inc();
41    }
42
43    pub fn dequeue(&self, message_type: &str) {
44        let pending = self.pending.read();
45        if let Some(counter) = pending.get(message_type) {
46            counter.fetch_sub(1, std::sync::atomic::Ordering::Relaxed);
47            self.pending_messages_gauge.dec();
48        }
49    }
50
51    pub fn get_pending_events(&self) -> HashMap<String, u64> {
52        let pending = self.pending.read();
53        pending
54            .iter()
55            .map(|(k, v)| (k.clone(), v.load(std::sync::atomic::Ordering::Relaxed)))
56            .collect()
57    }
58}