Skip to main content

ant_node/payment/
metrics.rs

1//! Quoting metrics tracking for ant-node.
2//!
3//! Tracks metrics used for quote generation, including:
4//! - `received_payment_count` - number of payments received
5//! - Storage capacity and usage
6//! - Network liveness information
7
8use ant_evm::QuotingMetrics;
9use parking_lot::RwLock;
10use std::path::PathBuf;
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::time::Instant;
13use tracing::{debug, info, warn};
14
15/// Number of operations between disk persists (debounce).
16const PERSIST_INTERVAL: usize = 10;
17
18/// Tracker for quoting metrics.
19///
20/// Maintains state that influences quote pricing, including payment history,
21/// storage capacity, and network information.
22#[derive(Debug)]
23pub struct QuotingMetricsTracker {
24    /// Number of payments received by this node.
25    received_payment_count: AtomicUsize,
26    /// Maximum records the node can store.
27    max_records: usize,
28    /// Number of records currently stored.
29    close_records_stored: AtomicUsize,
30    /// Records stored by type: `Vec<(data_type_index, count)>`.
31    records_per_type: RwLock<Vec<(u32, u32)>>,
32    /// Node start time for calculating `live_time`.
33    start_time: Instant,
34    /// Path for persisting metrics (optional).
35    persist_path: Option<PathBuf>,
36    /// Estimated network size.
37    network_size: AtomicU64,
38    /// Operations since last persist (for debouncing disk I/O).
39    ops_since_persist: AtomicUsize,
40}
41
42impl QuotingMetricsTracker {
43    /// Create a new metrics tracker.
44    ///
45    /// # Arguments
46    ///
47    /// * `max_records` - Maximum number of records this node can store
48    /// * `initial_records` - Initial number of records stored
49    #[must_use]
50    pub fn new(max_records: usize, initial_records: usize) -> Self {
51        Self {
52            received_payment_count: AtomicUsize::new(0),
53            max_records,
54            close_records_stored: AtomicUsize::new(initial_records),
55            records_per_type: RwLock::new(Vec::new()),
56            start_time: Instant::now(),
57            persist_path: None,
58            network_size: AtomicU64::new(500), // Conservative default
59            ops_since_persist: AtomicUsize::new(0),
60        }
61    }
62
63    /// Create a new metrics tracker with persistence.
64    ///
65    /// # Arguments
66    ///
67    /// * `max_records` - Maximum number of records
68    /// * `persist_path` - Path to persist metrics to disk
69    #[must_use]
70    pub fn with_persistence(max_records: usize, persist_path: &std::path::Path) -> Self {
71        let mut tracker = Self::new(max_records, 0);
72        tracker.persist_path = Some(persist_path.to_path_buf());
73
74        // Try to load existing metrics
75        if let Some(loaded) = Self::load_from_disk(persist_path) {
76            tracker
77                .received_payment_count
78                .store(loaded.received_payment_count, Ordering::SeqCst);
79            tracker
80                .close_records_stored
81                .store(loaded.close_records_stored, Ordering::SeqCst);
82            *tracker.records_per_type.write() = loaded.records_per_type;
83            info!(
84                "Loaded persisted metrics: {} payments received",
85                loaded.received_payment_count
86            );
87        }
88
89        tracker
90    }
91
92    /// Record a payment received.
93    pub fn record_payment(&self) {
94        let count = self.received_payment_count.fetch_add(1, Ordering::SeqCst) + 1;
95        debug!("Payment received, total count: {count}");
96        self.maybe_persist();
97    }
98
99    /// Record data stored.
100    ///
101    /// # Arguments
102    ///
103    /// * `data_type` - Type index of the data
104    pub fn record_store(&self, data_type: u32) {
105        self.close_records_stored.fetch_add(1, Ordering::SeqCst);
106
107        // Update per-type counts (scope the write lock)
108        {
109            let mut records = self.records_per_type.write();
110            if let Some(entry) = records.iter_mut().find(|(t, _)| *t == data_type) {
111                entry.1 = entry.1.saturating_add(1);
112            } else {
113                records.push((data_type, 1));
114            }
115        }
116
117        self.maybe_persist();
118    }
119
120    /// Get the number of payments received.
121    #[must_use]
122    pub fn payment_count(&self) -> usize {
123        self.received_payment_count.load(Ordering::SeqCst)
124    }
125
126    /// Get the number of records stored.
127    #[must_use]
128    pub fn records_stored(&self) -> usize {
129        self.close_records_stored.load(Ordering::SeqCst)
130    }
131
132    /// Get the node's live time in hours.
133    #[must_use]
134    pub fn live_time_hours(&self) -> u64 {
135        self.start_time.elapsed().as_secs() / 3600
136    }
137
138    /// Update the estimated network size.
139    pub fn set_network_size(&self, size: u64) {
140        self.network_size.store(size, Ordering::SeqCst);
141    }
142
143    /// Get quoting metrics for quote generation.
144    ///
145    /// # Arguments
146    ///
147    /// * `data_size` - Size of the data being quoted
148    /// * `data_type` - Type index of the data
149    #[must_use]
150    pub fn get_metrics(&self, data_size: usize, data_type: u32) -> QuotingMetrics {
151        QuotingMetrics {
152            data_type,
153            data_size,
154            close_records_stored: self.close_records_stored.load(Ordering::SeqCst),
155            records_per_type: self.records_per_type.read().clone(),
156            max_records: self.max_records,
157            received_payment_count: self.received_payment_count.load(Ordering::SeqCst),
158            live_time: self.live_time_hours(),
159            network_density: None, // Not used in pricing; reserved for future DHT range filtering
160            network_size: Some(self.network_size.load(Ordering::SeqCst)),
161        }
162    }
163
164    /// Debounced persist: only writes to disk every `PERSIST_INTERVAL` operations.
165    fn maybe_persist(&self) {
166        let ops = self.ops_since_persist.fetch_add(1, Ordering::Relaxed);
167        if ops % PERSIST_INTERVAL == 0 {
168            self.persist();
169        }
170    }
171
172    /// Persist metrics to disk.
173    fn persist(&self) {
174        if let Some(ref path) = self.persist_path {
175            let data = PersistedMetrics {
176                received_payment_count: self.received_payment_count.load(Ordering::SeqCst),
177                close_records_stored: self.close_records_stored.load(Ordering::SeqCst),
178                records_per_type: self.records_per_type.read().clone(),
179            };
180
181            if let Ok(bytes) = rmp_serde::to_vec(&data) {
182                if let Err(e) = std::fs::write(path, bytes) {
183                    warn!("Failed to persist metrics: {e}");
184                }
185            }
186        }
187    }
188
189    /// Load metrics from disk.
190    fn load_from_disk(path: &std::path::Path) -> Option<PersistedMetrics> {
191        let bytes = std::fs::read(path).ok()?;
192        rmp_serde::from_slice(&bytes).ok()
193    }
194}
195
196impl Drop for QuotingMetricsTracker {
197    fn drop(&mut self) {
198        self.persist();
199    }
200}
201
202/// Metrics persisted to disk.
203#[derive(Debug, serde::Serialize, serde::Deserialize)]
204struct PersistedMetrics {
205    received_payment_count: usize,
206    close_records_stored: usize,
207    records_per_type: Vec<(u32, u32)>,
208}
209
210#[cfg(test)]
211#[allow(clippy::expect_used)]
212mod tests {
213    use super::*;
214    use tempfile::tempdir;
215
216    #[test]
217    fn test_new_tracker() {
218        let tracker = QuotingMetricsTracker::new(1000, 50);
219        assert_eq!(tracker.payment_count(), 0);
220        assert_eq!(tracker.records_stored(), 50);
221    }
222
223    #[test]
224    fn test_record_payment() {
225        let tracker = QuotingMetricsTracker::new(1000, 0);
226        assert_eq!(tracker.payment_count(), 0);
227
228        tracker.record_payment();
229        assert_eq!(tracker.payment_count(), 1);
230
231        tracker.record_payment();
232        assert_eq!(tracker.payment_count(), 2);
233    }
234
235    #[test]
236    fn test_record_store() {
237        let tracker = QuotingMetricsTracker::new(1000, 0);
238        assert_eq!(tracker.records_stored(), 0);
239
240        tracker.record_store(0); // Chunk type
241        assert_eq!(tracker.records_stored(), 1);
242
243        tracker.record_store(0);
244        tracker.record_store(1); // Different type
245        assert_eq!(tracker.records_stored(), 3);
246
247        let metrics = tracker.get_metrics(1024, 0);
248        assert_eq!(metrics.records_per_type.len(), 2);
249    }
250
251    #[test]
252    fn test_get_metrics() {
253        let tracker = QuotingMetricsTracker::new(1000, 100);
254        tracker.record_payment();
255        tracker.record_payment();
256
257        let metrics = tracker.get_metrics(2048, 0);
258        assert_eq!(metrics.data_size, 2048);
259        assert_eq!(metrics.data_type, 0);
260        assert_eq!(metrics.max_records, 1000);
261        assert_eq!(metrics.close_records_stored, 100);
262        assert_eq!(metrics.received_payment_count, 2);
263    }
264
265    #[test]
266    fn test_persistence() {
267        let dir = tempdir().expect("tempdir");
268        let path = dir.path().join("metrics.bin");
269
270        // Create and populate tracker
271        {
272            let tracker = QuotingMetricsTracker::with_persistence(1000, &path);
273            tracker.record_payment();
274            tracker.record_payment();
275            tracker.record_store(0);
276        }
277
278        // Load from disk
279        let tracker = QuotingMetricsTracker::with_persistence(1000, &path);
280        assert_eq!(tracker.payment_count(), 2);
281        assert_eq!(tracker.records_stored(), 1);
282    }
283
284    #[test]
285    fn test_live_time_hours() {
286        let tracker = QuotingMetricsTracker::new(1000, 0);
287        // Just started, so live_time should be 0 hours
288        assert_eq!(tracker.live_time_hours(), 0);
289    }
290
291    #[test]
292    fn test_set_network_size() {
293        let tracker = QuotingMetricsTracker::new(1000, 0);
294        tracker.set_network_size(1000);
295
296        let metrics = tracker.get_metrics(0, 0);
297        assert_eq!(metrics.network_size, Some(1000));
298    }
299
300    #[test]
301    fn test_records_per_type_multiple_types() {
302        let tracker = QuotingMetricsTracker::new(1000, 0);
303
304        tracker.record_store(0);
305        tracker.record_store(0);
306        tracker.record_store(1);
307        tracker.record_store(2);
308        tracker.record_store(1);
309
310        let metrics = tracker.get_metrics(0, 0);
311        assert_eq!(metrics.records_per_type.len(), 3);
312
313        // Verify per-type counts
314        let type_0 = metrics.records_per_type.iter().find(|(t, _)| *t == 0);
315        let type_1 = metrics.records_per_type.iter().find(|(t, _)| *t == 1);
316        let type_2 = metrics.records_per_type.iter().find(|(t, _)| *t == 2);
317
318        assert_eq!(type_0.expect("type 0 exists").1, 2);
319        assert_eq!(type_1.expect("type 1 exists").1, 2);
320        assert_eq!(type_2.expect("type 2 exists").1, 1);
321    }
322
323    #[test]
324    fn test_persistence_round_trip_with_types() {
325        let dir = tempdir().expect("tempdir");
326        let path = dir.path().join("metrics_types.bin");
327
328        {
329            let tracker = QuotingMetricsTracker::with_persistence(1000, &path);
330            tracker.record_store(0);
331            tracker.record_store(0);
332            tracker.record_store(1);
333            tracker.record_payment();
334        }
335
336        let tracker = QuotingMetricsTracker::with_persistence(1000, &path);
337        assert_eq!(tracker.payment_count(), 1);
338        assert_eq!(tracker.records_stored(), 3); // 2 type-0 + 1 type-1
339
340        let metrics = tracker.get_metrics(0, 0);
341        assert_eq!(metrics.records_per_type.len(), 2);
342    }
343
344    #[test]
345    fn test_with_persistence_nonexistent_path() {
346        let dir = tempdir().expect("tempdir");
347        let path = dir.path().join("nonexistent_subdir").join("metrics.bin");
348
349        // Should not panic — just starts with defaults
350        let tracker = QuotingMetricsTracker::with_persistence(1000, &path);
351        assert_eq!(tracker.payment_count(), 0);
352        assert_eq!(tracker.records_stored(), 0);
353    }
354
355    #[test]
356    fn test_max_records_zero() {
357        let tracker = QuotingMetricsTracker::new(0, 0);
358        let metrics = tracker.get_metrics(1024, 0);
359        assert_eq!(metrics.max_records, 0);
360    }
361
362    #[test]
363    fn test_get_metrics_passes_data_params() {
364        let tracker = QuotingMetricsTracker::new(1000, 0);
365        let metrics = tracker.get_metrics(4096, 3);
366        assert_eq!(metrics.data_size, 4096);
367        assert_eq!(metrics.data_type, 3);
368    }
369
370    #[test]
371    fn test_default_network_size() {
372        let tracker = QuotingMetricsTracker::new(1000, 0);
373        let metrics = tracker.get_metrics(0, 0);
374        assert_eq!(metrics.network_size, Some(500));
375    }
376}