1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
use std::time::{Duration, Instant};
#[cfg(feature = "metrix")]
pub use self::metrix::MetrixCollector;
#[cfg(feature = "metrix")]
mod metrix;
pub trait MetricsCollector {
fn streaming_connect_attempt(&self);
fn streaming_connect_attempt_failed(&self);
fn consumer_connected(&self, attempt_started: Instant);
fn consumer_connection_lifetime(&self, connected_since: Instant);
fn consumer_line_received(&self, bytes: usize);
fn consumer_info_line_received(&self, bytes: usize);
fn consumer_keep_alive_line_received(&self, bytes: usize);
fn consumer_batch_line_received(&self, bytes: usize);
fn consumer_batch_received(&self, batch_received_at_timestamp: Instant);
fn dispatcher_batch_received(&self, batch_received_at_timestamp: Instant);
fn dispatcher_current_workers(&self, num_workers: usize);
fn worker_worker_started(&self);
fn worker_worker_stopped(&self);
fn worker_batch_received(&self, batch_received_at_timestamp: Instant);
fn worker_batch_size_bytes(&self, bytes: usize);
fn worker_batch_processed(&self, started: Instant);
fn worker_events_in_same_batch_processed(&self, n: usize);
fn committer_batch_received(&self, batch_received_at_timestamp: Instant);
fn committer_cursor_commit_attempt(&self, commit_attempt_started: Instant);
fn committer_cursor_committed(&self, commit_attempt_started: Instant);
fn committer_cursor_commit_failed(&self, commit_attempt_started: Instant);
fn committer_batches_committed(&self, n: usize);
fn committer_events_committed(&self, n: usize);
fn committer_first_cursor_age_on_commit(&self, age: Duration);
fn committer_last_cursor_age_on_commit(&self, age: Duration);
fn committer_cursor_buffer_time(&self, time_buffered: Duration);
fn committer_time_left_on_commit_until_invalid(&self, time_left: Duration);
fn other_panicked(&self);
fn other_dispatcher_gone(&self);
fn other_worker_gone(&self);
fn other_committer_gone(&self);
}
#[derive(Clone)]
pub struct DevNullMetricsCollector;
impl MetricsCollector for DevNullMetricsCollector {
fn streaming_connect_attempt(&self) {}
fn streaming_connect_attempt_failed(&self) {}
fn consumer_connected(&self, _attempt_started: Instant) {}
fn consumer_connection_lifetime(&self, _connected_since: Instant) {}
fn consumer_line_received(&self, _bytes: usize) {}
fn consumer_info_line_received(&self, _bytes: usize) {}
fn consumer_keep_alive_line_received(&self, _bytes: usize) {}
fn consumer_batch_line_received(&self, _bytes: usize) {}
fn consumer_batch_received(&self, _batch_received_at_timestamp: Instant) {}
fn dispatcher_batch_received(&self, _batch_received_at_timestamp: Instant) {}
fn dispatcher_current_workers(&self, _num_workers: usize) {}
fn worker_batch_received(&self, _batch_received_at_timestamp: Instant) {}
fn worker_worker_started(&self) {}
fn worker_worker_stopped(&self) {}
fn worker_batch_size_bytes(&self, _bytes: usize) {}
fn worker_batch_processed(&self, _started: Instant) {}
fn worker_events_in_same_batch_processed(&self, _n: usize) {}
fn committer_batch_received(&self, _batch_received_at_timestamp: Instant) {}
fn committer_cursor_committed(&self, _commit_attempt_started: Instant) {}
fn committer_batches_committed(&self, _n: usize) {}
fn committer_events_committed(&self, _n: usize) {}
fn committer_cursor_commit_attempt(&self, _commit_attempt_started: Instant) {}
fn committer_cursor_commit_failed(&self, _commit_attempt_started: Instant) {}
fn committer_first_cursor_age_on_commit(&self, _age: Duration) {}
fn committer_last_cursor_age_on_commit(&self, _age: Duration) {}
fn committer_cursor_buffer_time(&self, _time_buffered: Duration) {}
fn committer_time_left_on_commit_until_invalid(&self, _time_left: Duration) {}
fn other_panicked(&self) {}
fn other_dispatcher_gone(&self) {}
fn other_worker_gone(&self) {}
fn other_committer_gone(&self) {}
}