ractor/factory/discard.rs
1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Discard handler managing when jobs are discarded
7use super::Job;
8use super::JobKey;
9use crate::Message;
10
11/// The discard mode of a factory
12#[derive(Debug, Eq, PartialEq, Clone, Copy)]
13pub enum DiscardMode {
14 /// Discard oldest incoming jobs under backpressure
15 Oldest,
16 /// Discard newest incoming jobs under backpressure
17 Newest,
18}
19
20/// A worker's copy of the discard settings.
21///
22/// Originally we passed a cloned box of the dynamic calculator to the workers,
23/// but what that would do is have every worker re-compute the discard limit
24/// which if you have many thousands of workers is kind of useless.
25///
26/// Instead now we have the workers treat the limit as static, but have the
27/// factory compute the limit on it's interval and propagate the limit to the
28/// workers. The workers "think" it's static, but the factory handles the dynamics.
29/// This way the factory can keep the [DynamicDiscardHandler] as a single, uncloned
30/// instance. It also moves NUM_WORKER calculations to 1.
31#[derive(Debug, Clone)]
32pub(crate) enum WorkerDiscardSettings {
33 None,
34 Static { limit: usize, mode: DiscardMode },
35}
36
37impl WorkerDiscardSettings {
38 pub(crate) fn update_worker_limit(&mut self, new_limit: usize) {
39 if let Self::Static { limit, .. } = self {
40 *limit = new_limit;
41 }
42 }
43
44 pub(crate) fn get_limit_and_mode(&self) -> Option<(usize, DiscardMode)> {
45 match self {
46 Self::None => None,
47 Self::Static { limit, mode, .. } => Some((*limit, *mode)),
48 }
49 }
50}
51/// If a factory supports job discarding (loadshedding) it can have a few configurations
52/// which are defined in this enum. There is
53///
54/// 1. No discarding
55/// 2. A static queueing limit discarding, with a specific discarding mode
56/// 3. A dynamic queueing limit for discards, with a specified discarding mode and init discard limit.
57pub enum DiscardSettings {
58 /// Don't discard jobs
59 None,
60 /// A static, immutable limit
61 Static {
62 /// The current limit. If 0, means jobs will never queue and immediately be discarded
63 /// once all workers are busy
64 limit: usize,
65 /// Define the factory messaging discard mode denoting if the oldest or newest messages
66 /// should be discarded in back-pressure scenarios
67 ///
68 /// Default is [DiscardMode::Oldest], meaning discard jobs at the head of the queue
69 mode: DiscardMode,
70 },
71 /// Dynamic discarding is where the discard limit can change over time, controlled
72 /// by the `updater` which is an implementation of the [DynamicDiscardController]
73 Dynamic {
74 /// The current limit. If 0, means jobs will never queue and immediately be discarded
75 /// once all workers are busy
76 limit: usize,
77 /// Define the factory messaging discard mode denoting if the oldest or newest messages
78 /// should be discarded in back-pressure scenarios
79 ///
80 /// Default is [DiscardMode::Oldest], meaning discard jobs at the head of the queue
81 mode: DiscardMode,
82 /// The [DynamicDiscardController] implementation, which computes new limits dynamically
83 /// based on whatever metrics it wants
84 updater: Box<dyn DynamicDiscardController>,
85 },
86}
87
88impl std::fmt::Debug for DiscardSettings {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 match self {
91 DiscardSettings::None => {
92 write!(f, "DiscardSettings::None")
93 }
94 DiscardSettings::Static { limit, mode } => f
95 .debug_struct("DiscardSettings::Static")
96 .field("limit", limit)
97 .field("mode", mode)
98 .finish(),
99 DiscardSettings::Dynamic { limit, mode, .. } => f
100 .debug_struct("DiscardSettings::Dynamic")
101 .field("limit", limit)
102 .field("mode", mode)
103 .finish(),
104 }
105 }
106}
107
108impl DiscardSettings {
109 pub(crate) fn get_worker_settings(&self) -> WorkerDiscardSettings {
110 match &self {
111 Self::None => WorkerDiscardSettings::None,
112 Self::Static { limit, mode } => WorkerDiscardSettings::Static {
113 limit: *limit,
114 mode: *mode,
115 },
116 Self::Dynamic { limit, mode, .. } => WorkerDiscardSettings::Static {
117 limit: *limit,
118 mode: *mode,
119 },
120 }
121 }
122
123 /// Retrieve the discarding limit and [DiscardMode], if configured
124 pub fn get_limit_and_mode(&self) -> Option<(usize, DiscardMode)> {
125 match self {
126 Self::None => None,
127 Self::Static { limit, mode, .. } => Some((*limit, *mode)),
128 Self::Dynamic { limit, mode, .. } => Some((*limit, *mode)),
129 }
130 }
131}
132
133/// Controls the dynamic concurrency level by receiving periodic snapshots of job statistics
134/// and emitting a new concurrency limit
135#[cfg_attr(feature = "async-trait", crate::async_trait)]
136pub trait DynamicDiscardController: Send + Sync + 'static {
137 /// Compute the new threshold for discarding
138 ///
139 /// If you want to utilize metrics exposed in [crate::factory::stats] you can gather them
140 /// by utilizing `stats_facebook::service_data::get_service_data_singleton` to retrieve a
141 /// accessor to `ServiceData` which you can then resolve stats by name (either timeseries or
142 /// counters)
143 ///
144 /// The namespace of stats collected on the base controller factory are
145 /// `base_controller.factory.{FACTORY_NAME}.{STAT}`
146 ///
147 /// If no factory name is set, then "all" will be inserted
148 #[cfg(feature = "async-trait")]
149 async fn compute(&mut self, current_threshold: usize) -> usize;
150
151 /// Compute the new threshold for discarding
152 ///
153 /// If you want to utilize metrics exposed in [crate::factory::stats] you can gather them
154 /// by utilizing `stats_facebook::service_data::get_service_data_singleton` to retrieve a
155 /// accessor to `ServiceData` which you can then resolve stats by name (either timeseries or
156 /// counters)
157 ///
158 /// The namespace of stats collected on the base controller factory are
159 /// `base_controller.factory.{FACTORY_NAME}.{STAT}`
160 ///
161 /// If no factory name is set, then "all" will be inserted
162 #[cfg(not(feature = "async-trait"))]
163 fn compute(&mut self, current_threshold: usize) -> futures::future::BoxFuture<'_, usize>;
164}
165
166/// Reason for discarding a job
167#[derive(Debug, Eq, PartialEq, Clone)]
168pub enum DiscardReason {
169 /// The job TTLd
170 TtlExpired,
171 /// The job was rejected or dropped due to loadshedding
172 Loadshed,
173 /// The job was dropped due to factory shutting down
174 Shutdown,
175 /// The job was rejected due to rate limits being exceeded
176 RateLimited,
177}
178
179/// Trait defining the discard handler for a factory.
180pub trait DiscardHandler<TKey, TMsg>: Send + Sync + 'static
181where
182 TKey: JobKey,
183 TMsg: Message,
184{
185 /// Called on a job prior to being dropped from the factory.
186 ///
187 /// Useful scenarios are (a) collecting metrics, (b) logging, (c) tearing
188 /// down resources in the job, etc.
189 fn discard(&self, reason: DiscardReason, job: &mut Job<TKey, TMsg>);
190}