calimero-node 0.10.0

Core Calimero infrastructure and tools
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
//! Prometheus-based sync metrics implementation.
//!
//! Provides production-grade observability for the sync protocol using
//! the `prometheus-client` crate (already in dependencies).
//!
//! # Metric Categories
//!
//! ## Protocol Cost Metrics
//! - `sync_messages_sent_total{protocol}`: Messages sent by protocol type
//! - `sync_bytes_sent_total{protocol}`: Bytes sent by protocol type
//! - `sync_round_trips_total{protocol}`: Round trips by protocol type
//! - `sync_entities_transferred_total`: Total entities transferred
//! - `sync_merges_total{crdt_type}`: CRDT merges by type
//! - `sync_comparisons_total`: Hash comparisons performed
//!
//! ## Phase Timing
//! - `sync_phase_duration_seconds{phase}`: Histogram of phase durations
//!
//! ## Safety Metrics (Invariant Monitoring)
//! - `sync_snapshot_blocked_total`: Snapshot attempts blocked (I5)
//! - `sync_verification_failures_total`: Verification failures (I7)
//! - `sync_lww_fallback_total`: LWW fallback events
//! - `sync_buffer_drops_total`: Delta buffer drops (I6)
//!
//! ## Sync Session Metrics
//! - `sync_duration_seconds{protocol,outcome}`: Session duration histogram
//! - `sync_attempts_total{protocol}`: Total sync attempts
//! - `sync_successes_total{protocol}`: Successful syncs
//! - `sync_failures_total{protocol}`: Failed syncs

use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::{exponential_buckets, Histogram};
use prometheus_client::registry::Registry;
use std::sync::atomic::AtomicU64;
use std::time::Duration;

use super::metrics::{PhaseTimer, SyncMetricsCollector};

/// Known sync protocol names for label sanitization.
///
/// This prevents unbounded label cardinality from untrusted input.
const KNOWN_PROTOCOLS: &[&str] = &[
    "None",
    "Snapshot",
    "HashComparison",
    "DeltaSync",
    "SubtreePrefetch",
    "LevelWise",
    "BloomFilter",
];

/// Known CRDT type names for label sanitization.
const KNOWN_CRDT_TYPES: &[&str] = &[
    "GCounter",
    "PnCounter",
    "LwwRegister",
    "GSet",
    "ORSet",
    "LwwMap",
    "unknown",
];

/// Sanitize a protocol name to prevent unbounded label cardinality.
///
/// Returns the protocol name if known, otherwise "unknown".
fn sanitize_protocol(protocol: &str) -> &'static str {
    KNOWN_PROTOCOLS
        .iter()
        .find(|&&p| p == protocol)
        .copied()
        .unwrap_or("unknown")
}

/// Sanitize a CRDT type name to prevent unbounded label cardinality.
///
/// Returns the CRDT type if known, otherwise "unknown".
fn sanitize_crdt_type(crdt_type: &str) -> &'static str {
    KNOWN_CRDT_TYPES
        .iter()
        .find(|&&t| t == crdt_type)
        .copied()
        .unwrap_or("unknown")
}

/// Labels for protocol-specific metrics.
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct ProtocolLabels {
    protocol: String,
}

/// Labels for CRDT type metrics.
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct CrdtLabels {
    crdt_type: String,
}

/// Labels for phase timing metrics.
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct PhaseLabels {
    phase: String,
}

/// Labels for sync outcome metrics.
#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
struct OutcomeLabels {
    protocol: String,
    outcome: String,
}

/// Prometheus-based sync metrics collector.
///
/// Register this with your Prometheus registry during node initialization.
/// All metrics are thread-safe and use atomic operations.
#[derive(Debug)]
pub struct PrometheusSyncMetrics {
    // Protocol cost metrics
    messages_sent: Family<ProtocolLabels, Counter>,
    bytes_sent: Family<ProtocolLabels, Counter>,
    round_trips: Family<ProtocolLabels, Counter>,
    entities_transferred: Counter<u64, AtomicU64>,
    merges_total: Family<CrdtLabels, Counter>,
    comparisons_total: Counter<u64, AtomicU64>,

    // Phase timing
    phase_duration_seconds: Family<PhaseLabels, Histogram>,

    // Safety metrics
    snapshot_blocked_total: Counter<u64, AtomicU64>,
    verification_failures_total: Counter<u64, AtomicU64>,
    lww_fallback_total: Counter<u64, AtomicU64>,
    buffer_drops_total: Counter<u64, AtomicU64>,

    // Sync session metrics
    sync_duration_seconds: Family<OutcomeLabels, Histogram>,
    sync_attempts_total: Family<ProtocolLabels, Counter>,
    sync_successes_total: Family<ProtocolLabels, Counter>,
    sync_failures_total: Family<ProtocolLabels, Counter>,

    // Protocol selection metrics
    protocol_selections_total: Family<ProtocolLabels, Counter>,
}

impl PrometheusSyncMetrics {
    /// Create and register sync metrics with a Prometheus registry.
    ///
    /// # Arguments
    /// - `registry`: The Prometheus registry to register metrics with
    ///
    /// # Example
    ///
    /// ```rust,ignore
    /// use prometheus_client::registry::Registry;
    /// use calimero_node::sync::prometheus_metrics::PrometheusSyncMetrics;
    ///
    /// let mut registry = Registry::default();
    /// let metrics = PrometheusSyncMetrics::new(&mut registry);
    /// ```
    pub fn new(registry: &mut Registry) -> Self {
        // Create metrics with sensible histogram buckets
        let metrics = Self {
            messages_sent: Family::default(),
            bytes_sent: Family::default(),
            round_trips: Family::default(),
            entities_transferred: Counter::default(),
            merges_total: Family::default(),
            comparisons_total: Counter::default(),
            phase_duration_seconds: Family::new_with_constructor(|| {
                // Buckets from 1ms to ~16s (15 exponential buckets, base 2)
                Histogram::new(exponential_buckets(0.001, 2.0, 15))
            }),
            snapshot_blocked_total: Counter::default(),
            verification_failures_total: Counter::default(),
            lww_fallback_total: Counter::default(),
            buffer_drops_total: Counter::default(),
            sync_duration_seconds: Family::new_with_constructor(|| {
                // Buckets from 10ms to ~160s (15 exponential buckets, base 2)
                Histogram::new(exponential_buckets(0.01, 2.0, 15))
            }),
            sync_attempts_total: Family::default(),
            sync_successes_total: Family::default(),
            sync_failures_total: Family::default(),
            protocol_selections_total: Family::default(),
        };

        // Register all metrics with descriptions
        registry.register(
            "sync_messages_sent",
            "Total sync protocol messages sent",
            metrics.messages_sent.clone(),
        );
        registry.register(
            "sync_bytes_sent",
            "Total sync protocol bytes sent",
            metrics.bytes_sent.clone(),
        );
        registry.register(
            "sync_round_trips",
            "Total sync round trips",
            metrics.round_trips.clone(),
        );
        registry.register(
            "sync_entities_transferred",
            "Total entities transferred during sync",
            metrics.entities_transferred.clone(),
        );
        registry.register(
            "sync_merges",
            "Total CRDT merge operations",
            metrics.merges_total.clone(),
        );
        registry.register(
            "sync_comparisons",
            "Total entity hash comparisons",
            metrics.comparisons_total.clone(),
        );
        registry.register(
            "sync_phase_duration_seconds",
            "Duration of sync phases in seconds",
            metrics.phase_duration_seconds.clone(),
        );
        registry.register(
            "sync_snapshot_blocked",
            "Snapshot attempts blocked on initialized nodes (I5 protection)",
            metrics.snapshot_blocked_total.clone(),
        );
        registry.register(
            "sync_verification_failures",
            "Snapshot verification failures (I7 violations)",
            metrics.verification_failures_total.clone(),
        );
        registry.register(
            "sync_lww_fallback",
            "LWW fallback events due to missing CRDT type metadata",
            metrics.lww_fallback_total.clone(),
        );
        registry.register(
            "sync_buffer_drops",
            "Delta buffer drop events (I6 violation risk)",
            metrics.buffer_drops_total.clone(),
        );
        registry.register(
            "sync_duration_seconds",
            "Duration of sync sessions in seconds",
            metrics.sync_duration_seconds.clone(),
        );
        registry.register(
            "sync_attempts",
            "Total sync attempts by protocol",
            metrics.sync_attempts_total.clone(),
        );
        registry.register(
            "sync_successes",
            "Total successful syncs by protocol",
            metrics.sync_successes_total.clone(),
        );
        registry.register(
            "sync_failures",
            "Total failed syncs by protocol",
            metrics.sync_failures_total.clone(),
        );
        registry.register(
            "sync_protocol_selections",
            "Total protocol selection decisions by protocol",
            metrics.protocol_selections_total.clone(),
        );

        metrics
    }
}

impl SyncMetricsCollector for PrometheusSyncMetrics {
    fn record_message_sent(&self, protocol: &str, bytes: usize) {
        let labels = ProtocolLabels {
            protocol: sanitize_protocol(protocol).to_string(),
        };
        self.messages_sent.get_or_create(&labels).inc();
        self.bytes_sent.get_or_create(&labels).inc_by(bytes as u64);
    }

    fn record_round_trip(&self, protocol: &str) {
        let labels = ProtocolLabels {
            protocol: sanitize_protocol(protocol).to_string(),
        };
        self.round_trips.get_or_create(&labels).inc();
    }

    fn record_entities_transferred(&self, count: usize) {
        self.entities_transferred.inc_by(count as u64);
    }

    fn record_merge(&self, crdt_type: &str) {
        let labels = CrdtLabels {
            crdt_type: sanitize_crdt_type(crdt_type).to_string(),
        };
        self.merges_total.get_or_create(&labels).inc();
    }

    fn record_comparison(&self) {
        self.comparisons_total.inc();
    }

    fn record_phase_complete(&self, timer: PhaseTimer) {
        // Phase names are &'static str from our code, so no sanitization needed
        let labels = PhaseLabels {
            phase: timer.phase().to_string(),
        };
        self.phase_duration_seconds
            .get_or_create(&labels)
            .observe(timer.elapsed().as_secs_f64());
    }

    fn record_snapshot_blocked(&self) {
        self.snapshot_blocked_total.inc();
    }

    fn record_verification_failure(&self) {
        self.verification_failures_total.inc();
    }

    fn record_lww_fallback(&self) {
        self.lww_fallback_total.inc();
    }

    fn record_buffer_drop(&self) {
        self.buffer_drops_total.inc();
    }

    fn record_sync_start(&self, _context_id: &str, protocol: &str, _trigger: &str) {
        let labels = ProtocolLabels {
            protocol: sanitize_protocol(protocol).to_string(),
        };
        self.sync_attempts_total.get_or_create(&labels).inc();
    }

    fn record_sync_complete(
        &self,
        _context_id: &str,
        protocol: &str,
        duration: Duration,
        _entities: usize,
    ) {
        let sanitized = sanitize_protocol(protocol);
        let labels = OutcomeLabels {
            protocol: sanitized.to_string(),
            outcome: "success".to_string(),
        };
        self.sync_duration_seconds
            .get_or_create(&labels)
            .observe(duration.as_secs_f64());

        // Increment success counter
        let success_labels = ProtocolLabels {
            protocol: sanitized.to_string(),
        };
        self.sync_successes_total
            .get_or_create(&success_labels)
            .inc();
    }

    fn record_sync_failure(&self, _context_id: &str, protocol: &str, _reason: &str) {
        let labels = ProtocolLabels {
            protocol: sanitize_protocol(protocol).to_string(),
        };
        self.sync_failures_total.get_or_create(&labels).inc();
    }

    fn record_protocol_selected(&self, protocol: &str, _reason: &str, _divergence: f64) {
        let labels = ProtocolLabels {
            protocol: sanitize_protocol(protocol).to_string(),
        };
        self.protocol_selections_total.get_or_create(&labels).inc();
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_prometheus_metrics_creation() {
        let mut registry = Registry::default();
        let _metrics = PrometheusSyncMetrics::new(&mut registry);

        // Verify metrics are registered by encoding
        let mut buffer = String::new();
        prometheus_client::encoding::text::encode(&mut buffer, &registry).unwrap();

        // Check that some expected metrics are present
        assert!(buffer.contains("sync_messages_sent"));
        assert!(buffer.contains("sync_snapshot_blocked"));
        assert!(buffer.contains("sync_buffer_drops"));
    }

    #[test]
    fn test_prometheus_metrics_is_send_sync() {
        fn assert_send_sync<T: Send + Sync>() {}
        assert_send_sync::<PrometheusSyncMetrics>();
    }

    #[test]
    fn test_prometheus_metrics_recording() {
        let mut registry = Registry::default();
        let metrics = PrometheusSyncMetrics::new(&mut registry);

        // Record some metrics
        metrics.record_message_sent("HashComparison", 1024);
        metrics.record_round_trip("HashComparison");
        metrics.record_entities_transferred(10);
        metrics.record_merge("GCounter");
        metrics.record_comparison();
        metrics.record_snapshot_blocked();
        metrics.record_verification_failure();
        metrics.record_lww_fallback();
        metrics.record_buffer_drop();

        let timer = metrics.start_phase("test_phase");
        std::thread::sleep(std::time::Duration::from_millis(1));
        metrics.record_phase_complete(timer);

        metrics.record_sync_start("ctx-123", "HashComparison", "timer");
        metrics.record_sync_complete("ctx-123", "HashComparison", Duration::from_millis(100), 50);
        metrics.record_sync_failure("ctx-456", "Snapshot", "timeout");
        metrics.record_protocol_selected("HashComparison", "test", 0.05);

        // Encode and verify non-empty
        let mut buffer = String::new();
        prometheus_client::encoding::text::encode(&mut buffer, &registry).unwrap();
        assert!(!buffer.is_empty());
    }
}