1#![deny(clippy::unwrap_used)]
2#![deny(clippy::expect_used)]
3
4pub mod golden_signals;
5pub mod otlp;
6mod tracing_export;
7
8use std::net::SocketAddr;
9use std::sync::atomic::{AtomicU64, Ordering};
10
11pub use golden_signals::{
12 ERRORS_BY_TYPE,
13 ERRORS_TOTAL,
14 ErrorType,
16 GoldenSignalsSnapshot,
18 LATENCY_FETCH,
19 LATENCY_INGEST,
20 LATENCY_IO,
21 LATENCY_NETWORK,
22 LATENCY_REPLICATION,
23 LATENCY_SAMPLE_RATE,
24 LatencyHistogram,
26 LatencySnapshot,
27 LatencyTimer,
28 RATE_INGEST_BYTES,
29 RATE_INGEST_OPS,
30 RATE_READ_BYTES,
31 RATE_READ_OPS,
32 RateTracker,
34 SATURATION_BUFFER_POOL_TOTAL,
35 SATURATION_BUFFER_POOL_USED,
36 SATURATION_CONNECTIONS_MAX,
37 SATURATION_CONNECTIONS_USED,
38 SATURATION_MEMORY_TOTAL,
39 SATURATION_MEMORY_USED,
40 SATURATION_PENDING_IO,
41 SATURATION_QUEUE_CAPACITY,
42 SATURATION_QUEUE_DEPTH,
43 SampledTimer,
45 buffer_pool_saturation,
46 get_error_count,
47 get_total_errors,
48 memory_saturation,
49 queue_saturation,
50 record_error,
51 record_fetch_latency,
52 record_fetch_latency_sampled,
53 record_ingest_latency,
54 record_ingest_latency_sampled,
55 record_io_latency,
56 record_io_latency_sampled,
57 record_network_latency,
58 record_peer_replication_latency,
59 record_replication_latency,
60 saturation_ratio,
61 set_buffer_pool_usage,
62 set_connection_usage,
63 set_latency_sample_rate,
64 set_memory_usage,
65 set_pending_io,
66 set_queue_depth,
68 should_sample,
69 time_fetch,
70 time_fetch_sampled,
71 time_ingest,
72 time_ingest_sampled,
73 time_io,
74 time_io_sampled,
75 update_rates,
76};
77pub use tracing_export::{LocalSpan, OperationType, SpanContext, TracingConfig, init_tracing};
78
79pub static RECORDS_INGESTED: AtomicU64 = AtomicU64::new(0);
80pub static BYTES_INGESTED: AtomicU64 = AtomicU64::new(0);
81pub static BATCHES_WRITTEN: AtomicU64 = AtomicU64::new(0);
82pub static BYTES_WRITTEN: AtomicU64 = AtomicU64::new(0);
83pub static IO_OPS_SUBMITTED: AtomicU64 = AtomicU64::new(0);
84pub static IO_OPS_COMPLETED: AtomicU64 = AtomicU64::new(0);
85pub static CONNECTIONS_ACTIVE: AtomicU64 = AtomicU64::new(0);
86pub static CONNECTIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
87pub static CRC_FAILURES: AtomicU64 = AtomicU64::new(0);
88pub static BACKPRESSURE_EVENTS: AtomicU64 = AtomicU64::new(0);
89pub static POOL_EXHAUSTED: AtomicU64 = AtomicU64::new(0);
90pub static NUMA_MISALIGNED: AtomicU64 = AtomicU64::new(0);
91pub static FOLLOWER_EVICTIONS: AtomicU64 = AtomicU64::new(0);
92pub static FOLLOWER_RECOVERIES: AtomicU64 = AtomicU64::new(0);
93pub static QUORUM_FAILURES: AtomicU64 = AtomicU64::new(0);
94pub static QUORUM_TIMEOUTS: AtomicU64 = AtomicU64::new(0);
95
96pub static HLC_DRIFT_MS: AtomicU64 = AtomicU64::new(0);
98pub static HLC_DRIFT_WARNINGS: AtomicU64 = AtomicU64::new(0);
99pub static HLC_DRIFT_CRITICAL: AtomicU64 = AtomicU64::new(0);
100pub static HLC_LOGICAL_EXHAUSTED: AtomicU64 = AtomicU64::new(0);
101
102pub static RAFT_ELECTIONS_STARTED: AtomicU64 = AtomicU64::new(0);
104pub static RAFT_ELECTIONS_WON: AtomicU64 = AtomicU64::new(0);
105pub static RAFT_PRE_VOTES_REJECTED: AtomicU64 = AtomicU64::new(0);
106pub static RAFT_LEADER_STEPDOWNS: AtomicU64 = AtomicU64::new(0);
107pub static RAFT_FENCING_REJECTIONS: AtomicU64 = AtomicU64::new(0);
108pub static RAFT_LEADER_TENURE_ENDED: AtomicU64 = AtomicU64::new(0);
109pub static RAFT_LEADER_TENURE_SUM_MS: AtomicU64 = AtomicU64::new(0);
110pub static RAFT_LEADER_TENURE_LAST_MS: AtomicU64 = AtomicU64::new(0);
111pub static RAFT_LEADER_TENURE_AVG_MS: AtomicU64 = AtomicU64::new(0);
112pub static RAFT_ELECTION_STORMS: AtomicU64 = AtomicU64::new(0);
113pub static RAFT_ELECTION_WINDOW_COUNT: AtomicU64 = AtomicU64::new(0);
114pub static RAFT_ELECTION_ROUND_LAST_MS: AtomicU64 = AtomicU64::new(0);
115
116pub static CLUSTER_COORDINATOR_TICK_DRIFT_MS: AtomicU64 = AtomicU64::new(0);
118
119pub static CONTROL_RPC_IN_FLIGHT: AtomicU64 = AtomicU64::new(0);
121pub static CONTROL_RPC_STARVATION: AtomicU64 = AtomicU64::new(0);
122
123pub static READS_TOTAL: AtomicU64 = AtomicU64::new(0);
125pub static READ_BYTES_TOTAL: AtomicU64 = AtomicU64::new(0);
126pub static CONSUMER_THROTTLED: AtomicU64 = AtomicU64::new(0);
127pub static ZERO_COPY_SENDS: AtomicU64 = AtomicU64::new(0);
128
129pub static CLUSTER_LEADER_ID: AtomicU64 = AtomicU64::new(0);
131pub static CLUSTER_CURRENT_TERM: AtomicU64 = AtomicU64::new(0);
132pub static CLUSTER_NODE_COUNT: AtomicU64 = AtomicU64::new(0);
133pub static CLUSTER_HEALTHY_NODES: AtomicU64 = AtomicU64::new(0);
134pub static CLUSTER_IS_LEADER: AtomicU64 = AtomicU64::new(0);
135pub static CLUSTER_QUORUM_AVAILABLE: AtomicU64 = AtomicU64::new(0);
136pub static CLUSTER_LEADER_READY: AtomicU64 = AtomicU64::new(0);
137pub static CLUSTER_LEADER_READY_TRANSITION_MS: AtomicU64 = AtomicU64::new(0);
138pub static CLUSTER_ELECTED_NOT_READY_REJECTS: AtomicU64 = AtomicU64::new(0);
139pub static CLUSTER_APPLY_LAG_ENTRIES: AtomicU64 = AtomicU64::new(0);
140pub static CLUSTER_APPLY_LAG_AT_ELECTION: AtomicU64 = AtomicU64::new(0);
141pub static CLUSTER_COORDINATOR_READY: AtomicU64 = AtomicU64::new(1);
142
143pub static REPLICATION_LAG_BYTES: AtomicU64 = AtomicU64::new(0);
145pub static REPLICATION_LAST_SYNC_MS: AtomicU64 = AtomicU64::new(0);
146pub static REPLICATION_PENDING_OPS: AtomicU64 = AtomicU64::new(0);
147
148pub static RESYNC_STARTED: AtomicU64 = AtomicU64::new(0);
150pub static RESYNC_COMPLETED: AtomicU64 = AtomicU64::new(0);
151pub static RESYNC_FAILED: AtomicU64 = AtomicU64::new(0);
152pub static RESYNC_SEGMENTS_TRANSFERRED: AtomicU64 = AtomicU64::new(0);
153pub static RESYNC_BYTES_TRANSFERRED: AtomicU64 = AtomicU64::new(0);
154
155#[inline]
156pub fn increment_records_ingested(count: u64) {
157 RECORDS_INGESTED.fetch_add(count, Ordering::Relaxed);
158}
159
160#[inline]
161pub fn increment_bytes_ingested(bytes: u64) {
162 BYTES_INGESTED.fetch_add(bytes, Ordering::Relaxed);
163}
164
165#[inline]
166pub fn increment_batches_written() {
167 BATCHES_WRITTEN.fetch_add(1, Ordering::Relaxed);
168}
169
170#[inline]
171pub fn increment_bytes_written(bytes: u64) {
172 BYTES_WRITTEN.fetch_add(bytes, Ordering::Relaxed);
173}
174
175#[inline]
176pub fn increment_io_submitted() {
177 IO_OPS_SUBMITTED.fetch_add(1, Ordering::Relaxed);
178}
179
180#[inline]
181pub fn increment_io_completed() {
182 IO_OPS_COMPLETED.fetch_add(1, Ordering::Relaxed);
183}
184
185#[inline]
186pub fn increment_connections() {
187 CONNECTIONS_ACTIVE.fetch_add(1, Ordering::Relaxed);
188 CONNECTIONS_TOTAL.fetch_add(1, Ordering::Relaxed);
189}
190
191#[inline]
192pub fn decrement_connections() {
193 CONNECTIONS_ACTIVE.fetch_sub(1, Ordering::Relaxed);
194}
195
196#[inline]
197pub fn increment_crc_failures() {
198 CRC_FAILURES.fetch_add(1, Ordering::Relaxed);
199}
200
201#[inline]
202pub fn increment_backpressure() {
203 BACKPRESSURE_EVENTS.fetch_add(1, Ordering::Relaxed);
204}
205
206#[inline]
207pub fn increment_pool_exhausted() {
208 POOL_EXHAUSTED.fetch_add(1, Ordering::Relaxed);
209}
210
211#[inline]
212pub fn increment_numa_misaligned() {
213 NUMA_MISALIGNED.fetch_add(1, Ordering::Relaxed);
214}
215
216#[inline]
217pub fn increment_follower_evictions() {
218 FOLLOWER_EVICTIONS.fetch_add(1, Ordering::Relaxed);
219}
220
221#[inline]
222pub fn increment_follower_recoveries() {
223 FOLLOWER_RECOVERIES.fetch_add(1, Ordering::Relaxed);
224}
225
226#[inline]
227pub fn increment_quorum_failures() {
228 QUORUM_FAILURES.fetch_add(1, Ordering::Relaxed);
229}
230
231#[inline]
232pub fn increment_quorum_timeouts() {
233 QUORUM_TIMEOUTS.fetch_add(1, Ordering::Relaxed);
234}
235
236#[inline]
240pub fn set_hlc_drift_ms(drift_ms: u64) {
241 HLC_DRIFT_MS.store(drift_ms, Ordering::Relaxed);
242}
243
244#[inline]
246pub fn increment_hlc_drift_warnings() {
247 HLC_DRIFT_WARNINGS.fetch_add(1, Ordering::Relaxed);
248}
249
250#[inline]
252pub fn increment_hlc_drift_critical() {
253 HLC_DRIFT_CRITICAL.fetch_add(1, Ordering::Relaxed);
254}
255
256#[inline]
258pub fn increment_hlc_logical_exhausted() {
259 HLC_LOGICAL_EXHAUSTED.fetch_add(1, Ordering::Relaxed);
260}
261
262#[inline]
266pub fn increment_raft_elections_started() {
267 RAFT_ELECTIONS_STARTED.fetch_add(1, Ordering::Relaxed);
268}
269
270#[inline]
272pub fn increment_raft_elections_won() {
273 RAFT_ELECTIONS_WON.fetch_add(1, Ordering::Relaxed);
274}
275
276#[inline]
278pub fn increment_raft_pre_votes_rejected() {
279 RAFT_PRE_VOTES_REJECTED.fetch_add(1, Ordering::Relaxed);
280}
281
282#[inline]
284pub fn increment_raft_leader_stepdowns() {
285 RAFT_LEADER_STEPDOWNS.fetch_add(1, Ordering::Relaxed);
286}
287
288#[inline]
290pub fn increment_raft_fencing_rejections() {
291 RAFT_FENCING_REJECTIONS.fetch_add(1, Ordering::Relaxed);
292}
293
294#[inline]
296pub fn record_raft_leader_tenure_ms(ms: u64) {
297 RAFT_LEADER_TENURE_LAST_MS.store(ms, Ordering::Relaxed);
298 let ended = RAFT_LEADER_TENURE_ENDED.fetch_add(1, Ordering::Relaxed) + 1;
299 let sum = RAFT_LEADER_TENURE_SUM_MS.fetch_add(ms, Ordering::Relaxed) + ms;
300 let avg = if ended > 0 { sum / ended } else { 0 };
301 RAFT_LEADER_TENURE_AVG_MS.store(avg, Ordering::Relaxed);
302 metrics::histogram!("lance_raft_leader_tenure_ms").record(ms as f64);
303}
304
305#[inline]
307pub fn increment_raft_election_storms() {
308 RAFT_ELECTION_STORMS.fetch_add(1, Ordering::Relaxed);
309}
310
311#[inline]
313pub fn set_raft_election_window_count(count: u64) {
314 RAFT_ELECTION_WINDOW_COUNT.store(count, Ordering::Relaxed);
315}
316
317#[inline]
319pub fn record_raft_election_round_ms(ms: u64) {
320 RAFT_ELECTION_ROUND_LAST_MS.store(ms, Ordering::Relaxed);
321 metrics::histogram!("lance_raft_election_round_ms").record(ms as f64);
322}
323
324#[inline]
326pub fn record_raft_pre_vote_rpc_latency_ms(ms: u64) {
327 metrics::histogram!("lance_raft_pre_vote_rpc_latency_ms").record(ms as f64);
328}
329
330#[inline]
332pub fn record_raft_vote_rpc_latency_ms(ms: u64) {
333 metrics::histogram!("lance_raft_vote_rpc_latency_ms").record(ms as f64);
334}
335
336#[inline]
340pub fn increment_reads() {
341 READS_TOTAL.fetch_add(1, Ordering::Relaxed);
342}
343
344#[inline]
346pub fn increment_read_bytes(bytes: u64) {
347 READ_BYTES_TOTAL.fetch_add(bytes, Ordering::Relaxed);
348}
349
350#[inline]
352pub fn increment_consumer_throttled() {
353 CONSUMER_THROTTLED.fetch_add(1, Ordering::Relaxed);
354}
355
356#[inline]
358pub fn increment_zero_copy_sends() {
359 ZERO_COPY_SENDS.fetch_add(1, Ordering::Relaxed);
360}
361
362#[inline]
366pub fn set_cluster_leader_id(leader_id: u16) {
367 CLUSTER_LEADER_ID.store(leader_id as u64, Ordering::Relaxed);
368}
369
370#[inline]
372pub fn set_cluster_current_term(term: u64) {
373 CLUSTER_CURRENT_TERM.store(term, Ordering::Relaxed);
374}
375
376#[inline]
378pub fn set_cluster_node_count(count: usize) {
379 CLUSTER_NODE_COUNT.store(count as u64, Ordering::Relaxed);
380}
381
382#[inline]
384pub fn set_cluster_healthy_nodes(count: usize) {
385 CLUSTER_HEALTHY_NODES.store(count as u64, Ordering::Relaxed);
386}
387
388#[inline]
390pub fn set_cluster_is_leader(is_leader: bool) {
391 CLUSTER_IS_LEADER.store(if is_leader { 1 } else { 0 }, Ordering::Relaxed);
392}
393
394#[inline]
396pub fn set_cluster_quorum_available(available: bool) {
397 CLUSTER_QUORUM_AVAILABLE.store(if available { 1 } else { 0 }, Ordering::Relaxed);
398}
399
400#[inline]
402pub fn set_cluster_leader_ready(ready: bool) {
403 CLUSTER_LEADER_READY.store(if ready { 1 } else { 0 }, Ordering::Relaxed);
404}
405
406#[inline]
408pub fn set_cluster_leader_ready_transition_ms(ms: u64) {
409 CLUSTER_LEADER_READY_TRANSITION_MS.store(ms, Ordering::Relaxed);
410}
411
412#[inline]
414pub fn increment_cluster_elected_not_ready_rejects() {
415 CLUSTER_ELECTED_NOT_READY_REJECTS.fetch_add(1, Ordering::Relaxed);
416}
417
418#[inline]
420pub fn set_cluster_apply_lag_entries(entries: u64) {
421 CLUSTER_APPLY_LAG_ENTRIES.store(entries, Ordering::Relaxed);
422}
423
424#[inline]
426pub fn set_cluster_apply_lag_at_election(entries: u64) {
427 CLUSTER_APPLY_LAG_AT_ELECTION.store(entries, Ordering::Relaxed);
428}
429
430#[inline]
432pub fn set_cluster_coordinator_ready(ready: bool) {
433 CLUSTER_COORDINATOR_READY.store(if ready { 1 } else { 0 }, Ordering::Relaxed);
434}
435
436#[inline]
438pub fn set_cluster_coordinator_tick_drift_ms(ms: u64) {
439 CLUSTER_COORDINATOR_TICK_DRIFT_MS.store(ms, Ordering::Relaxed);
440}
441
442#[inline]
444pub fn cluster_coordinator_ready() -> bool {
445 CLUSTER_COORDINATOR_READY.load(Ordering::Relaxed) == 1
446}
447
448#[inline]
452pub fn set_replication_lag_bytes(bytes: u64) {
453 REPLICATION_LAG_BYTES.store(bytes, Ordering::Relaxed);
454}
455
456#[inline]
458pub fn set_replication_last_sync_ms(ms: u64) {
459 REPLICATION_LAST_SYNC_MS.store(ms, Ordering::Relaxed);
460}
461
462#[inline]
464pub fn set_replication_pending_ops(count: u64) {
465 REPLICATION_PENDING_OPS.store(count, Ordering::Relaxed);
466}
467
468#[inline]
470pub fn increment_replication_pending_ops() {
471 REPLICATION_PENDING_OPS.fetch_add(1, Ordering::Relaxed);
472}
473
474#[inline]
476pub fn decrement_replication_pending_ops() {
477 REPLICATION_PENDING_OPS.fetch_sub(1, Ordering::Relaxed);
478}
479
480#[inline]
482pub fn increment_control_rpc_in_flight() {
483 CONTROL_RPC_IN_FLIGHT.fetch_add(1, Ordering::Relaxed);
484}
485
486#[inline]
488pub fn decrement_control_rpc_in_flight() {
489 CONTROL_RPC_IN_FLIGHT.fetch_sub(1, Ordering::Relaxed);
490}
491
492#[inline]
494pub fn increment_control_rpc_starvation() {
495 CONTROL_RPC_STARVATION.fetch_add(1, Ordering::Relaxed);
496}
497
498#[inline]
502pub fn increment_resync_started() {
503 RESYNC_STARTED.fetch_add(1, Ordering::Relaxed);
504}
505
506#[inline]
508pub fn increment_resync_completed() {
509 RESYNC_COMPLETED.fetch_add(1, Ordering::Relaxed);
510}
511
512#[inline]
514pub fn increment_resync_failed() {
515 RESYNC_FAILED.fetch_add(1, Ordering::Relaxed);
516}
517
518#[inline]
520pub fn increment_resync_segments_transferred(count: u64) {
521 RESYNC_SEGMENTS_TRANSFERRED.fetch_add(count, Ordering::Relaxed);
522}
523
524#[inline]
526pub fn increment_resync_bytes_transferred(bytes: u64) {
527 RESYNC_BYTES_TRANSFERRED.fetch_add(bytes, Ordering::Relaxed);
528}
529
530pub struct MetricsSnapshot {
531 pub records_ingested: u64,
532 pub bytes_ingested: u64,
533 pub batches_written: u64,
534 pub bytes_written: u64,
535 pub io_ops_submitted: u64,
536 pub io_ops_completed: u64,
537 pub connections_active: u64,
538 pub connections_total: u64,
539 pub crc_failures: u64,
540 pub backpressure_events: u64,
541 pub pool_exhausted: u64,
542 pub numa_misaligned: u64,
543 pub follower_evictions: u64,
544 pub follower_recoveries: u64,
545 pub quorum_failures: u64,
546 pub hlc_drift_ms: u64,
548 pub hlc_drift_warnings: u64,
549 pub hlc_drift_critical: u64,
550 pub hlc_logical_exhausted: u64,
551 pub raft_elections_started: u64,
553 pub raft_elections_won: u64,
554 pub raft_pre_votes_rejected: u64,
555 pub raft_leader_stepdowns: u64,
556 pub raft_fencing_rejections: u64,
557 pub raft_leader_tenure_ended: u64,
558 pub raft_leader_tenure_last_ms: u64,
559 pub raft_leader_tenure_avg_ms: u64,
560 pub raft_election_storms: u64,
561 pub raft_election_window_count: u64,
562 pub raft_election_round_last_ms: u64,
563 pub reads_total: u64,
565 pub read_bytes_total: u64,
566 pub consumer_throttled: u64,
567 pub zero_copy_sends: u64,
568 pub cluster_leader_id: u64,
570 pub cluster_current_term: u64,
571 pub cluster_node_count: u64,
572 pub cluster_healthy_nodes: u64,
573 pub cluster_is_leader: u64,
574 pub cluster_quorum_available: u64,
575 pub cluster_leader_ready: u64,
576 pub cluster_leader_ready_transition_ms: u64,
577 pub cluster_elected_not_ready_rejects: u64,
578 pub cluster_apply_lag_entries: u64,
579 pub cluster_apply_lag_at_election: u64,
580 pub cluster_coordinator_ready: u64,
581 pub cluster_coordinator_tick_drift_ms: u64,
582 pub control_rpc_in_flight: u64,
584 pub control_rpc_starvation: u64,
585 pub replication_lag_bytes: u64,
587 pub replication_last_sync_ms: u64,
588 pub replication_pending_ops: u64,
589 pub resync_started: u64,
591 pub resync_completed: u64,
592 pub resync_failed: u64,
593 pub resync_segments_transferred: u64,
594 pub resync_bytes_transferred: u64,
595}
596
597impl MetricsSnapshot {
598 #[must_use]
599 pub fn capture() -> Self {
600 Self {
601 records_ingested: RECORDS_INGESTED.load(Ordering::Relaxed),
602 bytes_ingested: BYTES_INGESTED.load(Ordering::Relaxed),
603 batches_written: BATCHES_WRITTEN.load(Ordering::Relaxed),
604 bytes_written: BYTES_WRITTEN.load(Ordering::Relaxed),
605 io_ops_submitted: IO_OPS_SUBMITTED.load(Ordering::Relaxed),
606 io_ops_completed: IO_OPS_COMPLETED.load(Ordering::Relaxed),
607 connections_active: CONNECTIONS_ACTIVE.load(Ordering::Relaxed),
608 connections_total: CONNECTIONS_TOTAL.load(Ordering::Relaxed),
609 crc_failures: CRC_FAILURES.load(Ordering::Relaxed),
610 backpressure_events: BACKPRESSURE_EVENTS.load(Ordering::Relaxed),
611 pool_exhausted: POOL_EXHAUSTED.load(Ordering::Relaxed),
612 numa_misaligned: NUMA_MISALIGNED.load(Ordering::Relaxed),
613 follower_evictions: FOLLOWER_EVICTIONS.load(Ordering::Relaxed),
614 follower_recoveries: FOLLOWER_RECOVERIES.load(Ordering::Relaxed),
615 quorum_failures: QUORUM_FAILURES.load(Ordering::Relaxed),
616 hlc_drift_ms: HLC_DRIFT_MS.load(Ordering::Relaxed),
618 hlc_drift_warnings: HLC_DRIFT_WARNINGS.load(Ordering::Relaxed),
619 hlc_drift_critical: HLC_DRIFT_CRITICAL.load(Ordering::Relaxed),
620 hlc_logical_exhausted: HLC_LOGICAL_EXHAUSTED.load(Ordering::Relaxed),
621 raft_elections_started: RAFT_ELECTIONS_STARTED.load(Ordering::Relaxed),
623 raft_elections_won: RAFT_ELECTIONS_WON.load(Ordering::Relaxed),
624 raft_pre_votes_rejected: RAFT_PRE_VOTES_REJECTED.load(Ordering::Relaxed),
625 raft_leader_stepdowns: RAFT_LEADER_STEPDOWNS.load(Ordering::Relaxed),
626 raft_fencing_rejections: RAFT_FENCING_REJECTIONS.load(Ordering::Relaxed),
627 raft_leader_tenure_ended: RAFT_LEADER_TENURE_ENDED.load(Ordering::Relaxed),
628 raft_leader_tenure_last_ms: RAFT_LEADER_TENURE_LAST_MS.load(Ordering::Relaxed),
629 raft_leader_tenure_avg_ms: RAFT_LEADER_TENURE_AVG_MS.load(Ordering::Relaxed),
630 raft_election_storms: RAFT_ELECTION_STORMS.load(Ordering::Relaxed),
631 raft_election_window_count: RAFT_ELECTION_WINDOW_COUNT.load(Ordering::Relaxed),
632 raft_election_round_last_ms: RAFT_ELECTION_ROUND_LAST_MS.load(Ordering::Relaxed),
633 reads_total: READS_TOTAL.load(Ordering::Relaxed),
635 read_bytes_total: READ_BYTES_TOTAL.load(Ordering::Relaxed),
636 consumer_throttled: CONSUMER_THROTTLED.load(Ordering::Relaxed),
637 zero_copy_sends: ZERO_COPY_SENDS.load(Ordering::Relaxed),
638 cluster_leader_id: CLUSTER_LEADER_ID.load(Ordering::Relaxed),
640 cluster_current_term: CLUSTER_CURRENT_TERM.load(Ordering::Relaxed),
641 cluster_node_count: CLUSTER_NODE_COUNT.load(Ordering::Relaxed),
642 cluster_healthy_nodes: CLUSTER_HEALTHY_NODES.load(Ordering::Relaxed),
643 cluster_is_leader: CLUSTER_IS_LEADER.load(Ordering::Relaxed),
644 cluster_quorum_available: CLUSTER_QUORUM_AVAILABLE.load(Ordering::Relaxed),
645 cluster_leader_ready: CLUSTER_LEADER_READY.load(Ordering::Relaxed),
646 cluster_leader_ready_transition_ms: CLUSTER_LEADER_READY_TRANSITION_MS
647 .load(Ordering::Relaxed),
648 cluster_elected_not_ready_rejects: CLUSTER_ELECTED_NOT_READY_REJECTS
649 .load(Ordering::Relaxed),
650 cluster_apply_lag_entries: CLUSTER_APPLY_LAG_ENTRIES.load(Ordering::Relaxed),
651 cluster_apply_lag_at_election: CLUSTER_APPLY_LAG_AT_ELECTION.load(Ordering::Relaxed),
652 cluster_coordinator_ready: CLUSTER_COORDINATOR_READY.load(Ordering::Relaxed),
653 cluster_coordinator_tick_drift_ms: CLUSTER_COORDINATOR_TICK_DRIFT_MS
654 .load(Ordering::Relaxed),
655 control_rpc_in_flight: CONTROL_RPC_IN_FLIGHT.load(Ordering::Relaxed),
657 control_rpc_starvation: CONTROL_RPC_STARVATION.load(Ordering::Relaxed),
658 replication_lag_bytes: REPLICATION_LAG_BYTES.load(Ordering::Relaxed),
660 replication_last_sync_ms: REPLICATION_LAST_SYNC_MS.load(Ordering::Relaxed),
661 replication_pending_ops: REPLICATION_PENDING_OPS.load(Ordering::Relaxed),
662 resync_started: RESYNC_STARTED.load(Ordering::Relaxed),
664 resync_completed: RESYNC_COMPLETED.load(Ordering::Relaxed),
665 resync_failed: RESYNC_FAILED.load(Ordering::Relaxed),
666 resync_segments_transferred: RESYNC_SEGMENTS_TRANSFERRED.load(Ordering::Relaxed),
667 resync_bytes_transferred: RESYNC_BYTES_TRANSFERRED.load(Ordering::Relaxed),
668 }
669 }
670}
671
672pub fn init_prometheus_exporter(
673 addr: SocketAddr,
674) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
675 let builder = metrics_exporter_prometheus::PrometheusBuilder::new();
676 builder.with_http_listener(addr).install()?;
677
678 metrics::describe_counter!(
679 "lance_records_ingested_total",
680 "Total number of records ingested"
681 );
682 metrics::describe_counter!("lance_bytes_ingested_total", "Total bytes ingested");
683 metrics::describe_counter!(
684 "lance_batches_written_total",
685 "Total batches written to disk"
686 );
687 metrics::describe_counter!("lance_bytes_written_total", "Total bytes written to disk");
688 metrics::describe_counter!(
689 "lance_io_ops_submitted_total",
690 "Total I/O operations submitted"
691 );
692 metrics::describe_counter!(
693 "lance_io_ops_completed_total",
694 "Total I/O operations completed"
695 );
696 metrics::describe_gauge!(
697 "lance_connections_active",
698 "Current number of active connections"
699 );
700 metrics::describe_counter!(
701 "lance_connections_total",
702 "Total connections ever established"
703 );
704 metrics::describe_counter!("lance_crc_failures_total", "Total CRC validation failures");
705 metrics::describe_counter!(
706 "lance_backpressure_events_total",
707 "Total backpressure events"
708 );
709 metrics::describe_counter!(
710 "lance_pool_exhausted_total",
711 "Buffer pool exhaustion events"
712 );
713 metrics::describe_counter!("lance_numa_misaligned_total", "NUMA misalignment warnings");
714 metrics::describe_counter!(
715 "lance_follower_evictions_total",
716 "Followers evicted from quorum"
717 );
718 metrics::describe_counter!(
719 "lance_follower_recoveries_total",
720 "Followers recovered to quorum"
721 );
722 metrics::describe_counter!("lance_quorum_failures_total", "Quorum not reached events");
723
724 metrics::describe_gauge!(
726 "lance_hlc_drift_ms",
727 "Current HLC drift from wall clock in milliseconds"
728 );
729 metrics::describe_counter!(
730 "lance_hlc_drift_warnings_total",
731 "HLC drift warning events (drift > 100ms)"
732 );
733 metrics::describe_counter!(
734 "lance_hlc_drift_critical_total",
735 "HLC drift critical events (drift > 1s)"
736 );
737 metrics::describe_counter!(
738 "lance_hlc_logical_exhausted_total",
739 "HLC logical counter exhaustion events"
740 );
741
742 metrics::describe_counter!(
744 "lance_raft_elections_started_total",
745 "Raft elections started"
746 );
747 metrics::describe_counter!(
748 "lance_raft_elections_won_total",
749 "Raft elections won (became leader)"
750 );
751 metrics::describe_counter!(
752 "lance_raft_pre_votes_rejected_total",
753 "Raft pre-votes rejected"
754 );
755 metrics::describe_counter!("lance_raft_leader_stepdowns_total", "Raft leader stepdowns");
756 metrics::describe_counter!(
757 "lance_raft_fencing_rejections_total",
758 "Raft fencing token rejections"
759 );
760 metrics::describe_counter!(
761 "lance_raft_leader_tenure_ended_total",
762 "Count of completed leader tenures"
763 );
764 metrics::describe_gauge!(
765 "lance_raft_leader_tenure_last_ms",
766 "Most recent completed leader tenure duration in milliseconds"
767 );
768 metrics::describe_gauge!(
769 "lance_raft_leader_tenure_avg_ms",
770 "Rolling average completed leader tenure in milliseconds"
771 );
772 metrics::describe_counter!(
773 "lance_raft_election_storms_total",
774 "Election-storm windows detected"
775 );
776 metrics::describe_gauge!(
777 "lance_raft_election_window_count",
778 "Elections observed in the active storm window"
779 );
780 metrics::describe_gauge!(
781 "lance_raft_election_round_last_ms",
782 "Most recent election round duration in milliseconds"
783 );
784 metrics::describe_histogram!(
785 "lance_raft_leader_tenure_ms",
786 "Histogram of completed leader tenure durations (milliseconds)"
787 );
788 metrics::describe_histogram!(
789 "lance_raft_election_round_ms",
790 "Histogram of election round durations (milliseconds)"
791 );
792 metrics::describe_histogram!(
793 "lance_raft_pre_vote_rpc_latency_ms",
794 "Histogram of pre-vote RPC round-trip latency (milliseconds)"
795 );
796 metrics::describe_histogram!(
797 "lance_raft_vote_rpc_latency_ms",
798 "Histogram of vote RPC round-trip latency (milliseconds)"
799 );
800
801 metrics::describe_counter!("lance_reads_total", "Total read operations");
803 metrics::describe_counter!("lance_read_bytes_total", "Total bytes read");
804 metrics::describe_counter!(
805 "lance_consumer_throttled_total",
806 "Consumer rate limit events"
807 );
808 metrics::describe_counter!("lance_zero_copy_sends_total", "Zero-copy sends completed");
809
810 metrics::describe_gauge!("lance_cluster_leader_id", "Current cluster leader node ID");
812 metrics::describe_gauge!("lance_cluster_current_term", "Current Raft term");
813 metrics::describe_gauge!("lance_cluster_node_count", "Total nodes in cluster");
814 metrics::describe_gauge!(
815 "lance_cluster_healthy_nodes",
816 "Number of healthy (connected) nodes"
817 );
818 metrics::describe_gauge!(
819 "lance_cluster_is_leader",
820 "Whether this node is the leader (1=yes, 0=no)"
821 );
822 metrics::describe_gauge!(
823 "lance_cluster_quorum_available",
824 "Whether quorum is available (1=yes, 0=no)"
825 );
826 metrics::describe_gauge!(
827 "lance_cluster_leader_ready",
828 "Whether this node is elected leader and apply-caught-up (1=yes, 0=no)"
829 );
830 metrics::describe_gauge!(
831 "lance_cluster_leader_ready_transition_ms",
832 "Milliseconds from election win to leader-ready transition"
833 );
834 metrics::describe_counter!(
835 "lance_cluster_elected_not_ready_rejects_total",
836 "Requests rejected while node was leader but not yet ready"
837 );
838 metrics::describe_gauge!(
839 "lance_cluster_apply_lag_entries",
840 "Current commit_index - last_applied lag in entries"
841 );
842 metrics::describe_gauge!(
843 "lance_cluster_apply_lag_at_election",
844 "Apply lag sampled during leader readiness warm-up"
845 );
846 metrics::describe_gauge!(
847 "lance_cluster_coordinator_ready",
848 "Whether coordinator control-plane is healthy/readiness-eligible (1=yes, 0=no)"
849 );
850 metrics::describe_gauge!(
851 "lance_cluster_coordinator_tick_drift_ms",
852 "Coordinator election-check tick drift in milliseconds"
853 );
854
855 metrics::describe_gauge!(
856 "lance_control_rpc_in_flight",
857 "Current in-flight control-plane RPC count"
858 );
859 metrics::describe_counter!(
860 "lance_control_rpc_starvation_total",
861 "Control-plane RPC lock-contention/starvation events"
862 );
863
864 metrics::describe_gauge!(
866 "lance_replication_lag_bytes",
867 "Replication lag in bytes behind leader"
868 );
869 metrics::describe_gauge!(
870 "lance_replication_last_sync_ms",
871 "Time since last successful replication sync in milliseconds"
872 );
873 metrics::describe_gauge!(
874 "lance_replication_pending_ops",
875 "Number of pending replication operations"
876 );
877
878 metrics::describe_histogram!(
884 "lance_ingest_latency_seconds",
885 metrics::Unit::Seconds,
886 "Ingest request latency"
887 );
888 metrics::describe_histogram!(
889 "lance_fetch_latency_seconds",
890 metrics::Unit::Seconds,
891 "Fetch request latency"
892 );
893 metrics::describe_histogram!(
894 "lance_io_latency_seconds",
895 metrics::Unit::Seconds,
896 "I/O operation latency"
897 );
898 metrics::describe_histogram!(
899 "lance_replication_latency_seconds",
900 metrics::Unit::Seconds,
901 "Replication latency"
902 );
903 metrics::describe_histogram!(
904 "lance_network_latency_seconds",
905 metrics::Unit::Seconds,
906 "Network round-trip latency"
907 );
908
909 metrics::describe_gauge!(
911 "lance_ingest_ops_per_second",
912 "Ingest operations per second"
913 );
914 metrics::describe_gauge!(
915 "lance_ingest_bytes_per_second",
916 "Ingest throughput in bytes per second"
917 );
918 metrics::describe_gauge!("lance_read_ops_per_second", "Read operations per second");
919 metrics::describe_gauge!(
920 "lance_read_bytes_per_second",
921 "Read throughput in bytes per second"
922 );
923
924 metrics::describe_counter!("lance_errors_total", "Total errors by type");
926
927 metrics::describe_gauge!(
929 "lance_queue_saturation_ratio",
930 "Queue depth saturation (0.0-1.0)"
931 );
932 metrics::describe_gauge!(
933 "lance_memory_saturation_ratio",
934 "Memory saturation (0.0-1.0)"
935 );
936 metrics::describe_gauge!(
937 "lance_buffer_pool_saturation_ratio",
938 "Buffer pool saturation (0.0-1.0)"
939 );
940 metrics::describe_gauge!(
941 "lance_connection_saturation_ratio",
942 "Connection pool saturation (0.0-1.0)"
943 );
944 metrics::describe_gauge!("lance_pending_io_count", "Number of pending I/O operations");
945
946 Ok(())
947}
948
949pub fn export_to_prometheus() {
950 let snapshot = MetricsSnapshot::capture();
951
952 metrics::counter!("lance_records_ingested_total").absolute(snapshot.records_ingested);
953 metrics::counter!("lance_bytes_ingested_total").absolute(snapshot.bytes_ingested);
954 metrics::counter!("lance_batches_written_total").absolute(snapshot.batches_written);
955 metrics::counter!("lance_bytes_written_total").absolute(snapshot.bytes_written);
956 metrics::counter!("lance_io_ops_submitted_total").absolute(snapshot.io_ops_submitted);
957 metrics::counter!("lance_io_ops_completed_total").absolute(snapshot.io_ops_completed);
958 metrics::gauge!("lance_connections_active").set(snapshot.connections_active as f64);
959 metrics::counter!("lance_connections_total").absolute(snapshot.connections_total);
960 metrics::counter!("lance_crc_failures_total").absolute(snapshot.crc_failures);
961 metrics::counter!("lance_backpressure_events_total").absolute(snapshot.backpressure_events);
962 metrics::counter!("lance_pool_exhausted_total").absolute(snapshot.pool_exhausted);
963 metrics::counter!("lance_numa_misaligned_total").absolute(snapshot.numa_misaligned);
964 metrics::counter!("lance_follower_evictions_total").absolute(snapshot.follower_evictions);
965 metrics::counter!("lance_follower_recoveries_total").absolute(snapshot.follower_recoveries);
966 metrics::counter!("lance_quorum_failures_total").absolute(snapshot.quorum_failures);
967
968 metrics::gauge!("lance_hlc_drift_ms").set(snapshot.hlc_drift_ms as f64);
970 metrics::counter!("lance_hlc_drift_warnings_total").absolute(snapshot.hlc_drift_warnings);
971 metrics::counter!("lance_hlc_drift_critical_total").absolute(snapshot.hlc_drift_critical);
972 metrics::counter!("lance_hlc_logical_exhausted_total").absolute(snapshot.hlc_logical_exhausted);
973
974 metrics::counter!("lance_raft_elections_started_total")
976 .absolute(snapshot.raft_elections_started);
977 metrics::counter!("lance_raft_elections_won_total").absolute(snapshot.raft_elections_won);
978 metrics::counter!("lance_raft_pre_votes_rejected_total")
979 .absolute(snapshot.raft_pre_votes_rejected);
980 metrics::counter!("lance_raft_leader_stepdowns_total").absolute(snapshot.raft_leader_stepdowns);
981 metrics::counter!("lance_raft_fencing_rejections_total")
982 .absolute(snapshot.raft_fencing_rejections);
983 metrics::counter!("lance_raft_leader_tenure_ended_total")
984 .absolute(snapshot.raft_leader_tenure_ended);
985 metrics::gauge!("lance_raft_leader_tenure_last_ms")
986 .set(snapshot.raft_leader_tenure_last_ms as f64);
987 metrics::gauge!("lance_raft_leader_tenure_avg_ms")
988 .set(snapshot.raft_leader_tenure_avg_ms as f64);
989 metrics::counter!("lance_raft_election_storms_total").absolute(snapshot.raft_election_storms);
990 metrics::gauge!("lance_raft_election_window_count")
991 .set(snapshot.raft_election_window_count as f64);
992 metrics::gauge!("lance_raft_election_round_last_ms")
993 .set(snapshot.raft_election_round_last_ms as f64);
994
995 metrics::counter!("lance_reads_total").absolute(snapshot.reads_total);
997 metrics::counter!("lance_read_bytes_total").absolute(snapshot.read_bytes_total);
998 metrics::counter!("lance_consumer_throttled_total").absolute(snapshot.consumer_throttled);
999 metrics::counter!("lance_zero_copy_sends_total").absolute(snapshot.zero_copy_sends);
1000
1001 metrics::gauge!("lance_cluster_leader_id").set(snapshot.cluster_leader_id as f64);
1003 metrics::gauge!("lance_cluster_current_term").set(snapshot.cluster_current_term as f64);
1004 metrics::gauge!("lance_cluster_node_count").set(snapshot.cluster_node_count as f64);
1005 metrics::gauge!("lance_cluster_healthy_nodes").set(snapshot.cluster_healthy_nodes as f64);
1006 metrics::gauge!("lance_cluster_is_leader").set(snapshot.cluster_is_leader as f64);
1007 metrics::gauge!("lance_cluster_quorum_available").set(snapshot.cluster_quorum_available as f64);
1008 metrics::gauge!("lance_cluster_leader_ready").set(snapshot.cluster_leader_ready as f64);
1009 metrics::gauge!("lance_cluster_leader_ready_transition_ms")
1010 .set(snapshot.cluster_leader_ready_transition_ms as f64);
1011 metrics::counter!("lance_cluster_elected_not_ready_rejects_total")
1012 .absolute(snapshot.cluster_elected_not_ready_rejects);
1013 metrics::gauge!("lance_cluster_apply_lag_entries")
1014 .set(snapshot.cluster_apply_lag_entries as f64);
1015 metrics::gauge!("lance_cluster_apply_lag_at_election")
1016 .set(snapshot.cluster_apply_lag_at_election as f64);
1017 metrics::gauge!("lance_cluster_coordinator_ready")
1018 .set(snapshot.cluster_coordinator_ready as f64);
1019 metrics::gauge!("lance_cluster_coordinator_tick_drift_ms")
1020 .set(snapshot.cluster_coordinator_tick_drift_ms as f64);
1021
1022 metrics::gauge!("lance_control_rpc_in_flight").set(snapshot.control_rpc_in_flight as f64);
1023 metrics::counter!("lance_control_rpc_starvation_total")
1024 .absolute(snapshot.control_rpc_starvation);
1025
1026 metrics::gauge!("lance_replication_lag_bytes").set(snapshot.replication_lag_bytes as f64);
1028 metrics::gauge!("lance_replication_last_sync_ms").set(snapshot.replication_last_sync_ms as f64);
1029 metrics::gauge!("lance_replication_pending_ops").set(snapshot.replication_pending_ops as f64);
1030
1031 metrics::counter!("lance_resync_started_total").absolute(snapshot.resync_started);
1033 metrics::counter!("lance_resync_completed_total").absolute(snapshot.resync_completed);
1034 metrics::counter!("lance_resync_failed_total").absolute(snapshot.resync_failed);
1035 metrics::counter!("lance_resync_segments_transferred_total")
1036 .absolute(snapshot.resync_segments_transferred);
1037 metrics::counter!("lance_resync_bytes_transferred_total")
1038 .absolute(snapshot.resync_bytes_transferred);
1039
1040 let gs = GoldenSignalsSnapshot::capture();
1045
1046 export_latency_percentiles("lance_ingest_latency", &gs.latency_ingest);
1048 export_latency_percentiles("lance_fetch_latency", &gs.latency_fetch);
1049 export_latency_percentiles("lance_io_latency", &gs.latency_io);
1050 export_latency_percentiles("lance_replication_latency", &gs.latency_replication);
1051 export_latency_percentiles("lance_network_latency", &gs.latency_network);
1052
1053 metrics::gauge!("lance_ingest_ops_per_second").set(gs.traffic_ingest_ops_per_sec);
1055 metrics::gauge!("lance_ingest_bytes_per_second").set(gs.traffic_ingest_bytes_per_sec);
1056 metrics::gauge!("lance_read_ops_per_second").set(gs.traffic_read_ops_per_sec);
1057 metrics::gauge!("lance_read_bytes_per_second").set(gs.traffic_read_bytes_per_sec);
1058
1059 for (i, &count) in gs.errors_by_type.iter().enumerate() {
1061 let error_type = match i {
1062 0 => "protocol",
1063 1 => "io",
1064 2 => "timeout",
1065 3 => "checksum",
1066 4 => "replication",
1067 5 => "backpressure",
1068 6 => "auth",
1069 7 => "internal",
1070 _ => "unknown",
1071 };
1072 metrics::counter!("lance_errors_total", "type" => error_type).absolute(count);
1073 }
1074
1075 metrics::gauge!("lance_queue_saturation_ratio").set(gs.saturation_queue);
1077 metrics::gauge!("lance_memory_saturation_ratio").set(gs.saturation_memory);
1078 metrics::gauge!("lance_buffer_pool_saturation_ratio").set(gs.saturation_buffer_pool);
1079 metrics::gauge!("lance_connection_saturation_ratio").set(gs.saturation_connections);
1080 metrics::gauge!("lance_pending_io_count").set(gs.saturation_pending_io as f64);
1081}
1082
1083fn export_latency_percentiles(prefix: &str, snap: &LatencySnapshot) {
1084 let p50_sec = snap.p50() as f64 / 1_000_000.0;
1085 let p95_sec = snap.p95() as f64 / 1_000_000.0;
1086 let p99_sec = snap.p99() as f64 / 1_000_000.0;
1087 let p999_sec = snap.p999() as f64 / 1_000_000.0;
1088 let avg_sec = snap.avg_us() / 1_000_000.0;
1089
1090 metrics::gauge!(format!("{}_p50_seconds", prefix)).set(p50_sec);
1091 metrics::gauge!(format!("{}_p95_seconds", prefix)).set(p95_sec);
1092 metrics::gauge!(format!("{}_p99_seconds", prefix)).set(p99_sec);
1093 metrics::gauge!(format!("{}_p999_seconds", prefix)).set(p999_sec);
1094 metrics::gauge!(format!("{}_avg_seconds", prefix)).set(avg_sec);
1095 metrics::gauge!(format!("{}_count", prefix)).set(snap.count as f64);
1096}