1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.
//! Statistics management + collection for factories
use std::sync::Arc;
use crate::concurrency::Duration;
use crate::concurrency::Instant;
use crate::factory::JobOptions;
/// A wrapper over whatever stats collection a user wishes to utilize
/// in the [super::Factory].
pub trait FactoryStatsLayer: Send + Sync + 'static {
/// Called when a factory ping has been received, marking the duration
/// between when it was sent and now.
///
/// Measures ping latency on the factory
fn factory_ping_received(&self, factory: &str, sent: Instant);
/// Called when a worker replies to a ping request from the factory,
/// measuring the duration between the time the ping was sent and the
/// factory processed the ping response.
///
/// Measures worker "free" latency and this metric relates to identification
/// of "stuck" or slow workers.
fn worker_ping_received(&self, factory: &str, elapsed: Duration);
/// Called for each new incoming job
fn new_job(&self, factory: &str);
/// Called when a job is completed to report factory processing time,
/// worker processing time, total processing time, and job count
///
/// From these metrics you can derive
/// 1. Time in factory's queue = factory_job_processing_latency_usec - worker_job_processing_latency_usec
/// 2. Time in worker's queue + being processed by worker = worker_job_processing_latency_usec
/// 3. Total time since submission = job_processing_latency_usec
fn job_completed(&self, factory: &str, options: &JobOptions);
/// Called when a job is discarded
fn job_discarded(&self, factory: &str);
/// Called when a job is rate limited
fn job_rate_limited(&self, factory: &str);
/// Called when jobs TTL timeout in the factory's queue
fn job_ttl_expired(&self, factory: &str, num_removed: usize);
/// Fixed-period recording of the factory's queue depth
fn record_queue_depth(&self, factory: &str, depth: usize);
/// Fixed-period recording of the factory's number of processed messages
fn record_processing_messages_count(&self, factory: &str, count: usize);
/// Fixed-period recording of the factory's in-flight message count (processing + queued)
///
/// Default empty implemention for backwards compatibility
#[allow(unused_variables)]
fn record_in_flight_messages_count(&self, factory: &str, count: usize) {}
/// Fixed-period recording of the factory's number of workers
fn record_worker_count(&self, factory: &str, count: usize);
/// Fixed-period recording of the factory's maximum allowed queue size
fn record_queue_limit(&self, factory: &str, count: usize);
}
impl FactoryStatsLayer for Option<Arc<dyn FactoryStatsLayer>> {
/// Called when a factory ping has been received, marking the duration
/// between when it was sent and now.
///
/// Measures ping latency on the factory
fn factory_ping_received(&self, factory: &str, sent: Instant) {
if let Some(s) = self {
s.factory_ping_received(factory, sent);
}
}
/// Called when a worker replies to a ping request from the factory,
/// measuring the duration between the time the ping was sent and the
/// factory processed the ping response.
///
/// Measures worker "free" latency and this metric relates to identification
/// of "stuck" or slow workers.
fn worker_ping_received(&self, factory: &str, elapsed: Duration) {
if let Some(s) = self {
s.worker_ping_received(factory, elapsed);
}
}
/// Called for each new incoming job
fn new_job(&self, factory: &str) {
if let Some(s) = self {
s.new_job(factory);
}
}
/// Called when a job is completed to report factory processing time,
/// worker processing time, total processing time, and job count
///
/// From these metrics you can derive
/// 1. Time in factory's queue = factory_job_processing_latency_usec - worker_job_processing_latency_usec
/// 2. Time in worker's queue + being processed by worker = worker_job_processing_latency_usec
/// 3. Total time since submission = job_processing_latency_usec
fn job_completed(&self, factory: &str, options: &JobOptions) {
if let Some(s) = self {
s.job_completed(factory, options);
}
}
/// Called when a job is discarded
fn job_discarded(&self, factory: &str) {
if let Some(s) = self {
s.job_discarded(factory);
}
}
/// Called when a job is rate limited
fn job_rate_limited(&self, factory: &str) {
if let Some(s) = self {
s.job_rate_limited(factory);
}
}
/// Called when a job TTLs in the factory's queue
fn job_ttl_expired(&self, factory: &str, num_removed: usize) {
if let Some(s) = self {
s.job_ttl_expired(factory, num_removed);
}
}
/// Fixed-period recording of the factory's queue depth
fn record_queue_depth(&self, factory: &str, depth: usize) {
if let Some(s) = self {
s.record_queue_depth(factory, depth);
}
}
/// Fixed-period recording of the factory's number of processed messages
fn record_processing_messages_count(&self, factory: &str, count: usize) {
if let Some(s) = self {
s.record_processing_messages_count(factory, count);
}
}
/// Fixed-period recording of the factory's in-flight message count (processing + queued)
fn record_in_flight_messages_count(&self, factory: &str, count: usize) {
if let Some(s) = self {
s.record_in_flight_messages_count(factory, count);
}
}
/// Fixed-period recording of the factory's number of workers
fn record_worker_count(&self, factory: &str, count: usize) {
if let Some(s) = self {
s.record_worker_count(factory, count);
}
}
/// Fixed-period recording of the factory's maximum allowed queue size
fn record_queue_limit(&self, factory: &str, count: usize) {
if let Some(s) = self {
s.record_queue_limit(factory, count);
}
}
}