Skip to main content

ractor/factory/
discard.rs

1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Discard handler managing when jobs are discarded
7use super::Job;
8use super::JobKey;
9use crate::Message;
10
11/// The discard mode of a factory
12#[derive(Debug, Eq, PartialEq, Clone, Copy)]
13pub enum DiscardMode {
14    /// Discard oldest incoming jobs under backpressure
15    Oldest,
16    /// Discard newest incoming jobs under backpressure
17    Newest,
18}
19
20/// A worker's copy of the discard settings.
21///
22/// Originally we passed a cloned box of the dynamic calculator to the workers,
23/// but what that would do is have every worker re-compute the discard limit
24/// which if you have many thousands of workers is kind of useless.
25///
26/// Instead now we have the workers treat the limit as static, but have the
27/// factory compute the limit on it's interval and propagate the limit to the
28/// workers. The workers "think" it's static, but the factory handles the dynamics.
29/// This way the factory can keep the [DynamicDiscardHandler] as a single, uncloned
30/// instance. It also moves NUM_WORKER calculations to 1.
31#[derive(Debug, Clone)]
32pub(crate) enum WorkerDiscardSettings {
33    None,
34    Static { limit: usize, mode: DiscardMode },
35}
36
37impl WorkerDiscardSettings {
38    pub(crate) fn update_worker_limit(&mut self, new_limit: usize) {
39        if let Self::Static { limit, .. } = self {
40            *limit = new_limit;
41        }
42    }
43
44    pub(crate) fn get_limit_and_mode(&self) -> Option<(usize, DiscardMode)> {
45        match self {
46            Self::None => None,
47            Self::Static { limit, mode, .. } => Some((*limit, *mode)),
48        }
49    }
50}
51/// If a factory supports job discarding (loadshedding) it can have a few configurations
52/// which are defined in this enum. There is
53///
54/// 1. No discarding
55/// 2. A static queueing limit discarding, with a specific discarding mode
56/// 3. A dynamic queueing limit for discards, with a specified discarding mode and init discard limit.
57pub enum DiscardSettings {
58    /// Don't discard jobs
59    None,
60    /// A static, immutable limit
61    Static {
62        /// The current limit. If 0, means jobs will never queue and immediately be discarded
63        /// once all workers are busy
64        limit: usize,
65        /// Define the factory messaging discard mode denoting if the oldest or newest messages
66        /// should be discarded in back-pressure scenarios
67        ///
68        /// Default is [DiscardMode::Oldest], meaning discard jobs at the head of the queue
69        mode: DiscardMode,
70    },
71    /// Dynamic discarding is where the discard limit can change over time, controlled
72    /// by the `updater` which is an implementation of the [DynamicDiscardController]
73    Dynamic {
74        /// The current limit. If 0, means jobs will never queue and immediately be discarded
75        /// once all workers are busy
76        limit: usize,
77        /// Define the factory messaging discard mode denoting if the oldest or newest messages
78        /// should be discarded in back-pressure scenarios
79        ///
80        /// Default is [DiscardMode::Oldest], meaning discard jobs at the head of the queue
81        mode: DiscardMode,
82        /// The [DynamicDiscardController] implementation, which computes new limits dynamically
83        /// based on whatever metrics it wants
84        updater: Box<dyn DynamicDiscardController>,
85    },
86}
87
88impl std::fmt::Debug for DiscardSettings {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        match self {
91            DiscardSettings::None => {
92                write!(f, "DiscardSettings::None")
93            }
94            DiscardSettings::Static { limit, mode } => f
95                .debug_struct("DiscardSettings::Static")
96                .field("limit", limit)
97                .field("mode", mode)
98                .finish(),
99            DiscardSettings::Dynamic { limit, mode, .. } => f
100                .debug_struct("DiscardSettings::Dynamic")
101                .field("limit", limit)
102                .field("mode", mode)
103                .finish(),
104        }
105    }
106}
107
108impl DiscardSettings {
109    pub(crate) fn get_worker_settings(&self) -> WorkerDiscardSettings {
110        match &self {
111            Self::None => WorkerDiscardSettings::None,
112            Self::Static { limit, mode } => WorkerDiscardSettings::Static {
113                limit: *limit,
114                mode: *mode,
115            },
116            Self::Dynamic { limit, mode, .. } => WorkerDiscardSettings::Static {
117                limit: *limit,
118                mode: *mode,
119            },
120        }
121    }
122
123    /// Retrieve the discarding limit and [DiscardMode], if configured
124    pub fn get_limit_and_mode(&self) -> Option<(usize, DiscardMode)> {
125        match self {
126            Self::None => None,
127            Self::Static { limit, mode, .. } => Some((*limit, *mode)),
128            Self::Dynamic { limit, mode, .. } => Some((*limit, *mode)),
129        }
130    }
131}
132
133/// Controls the dynamic concurrency level by receiving periodic snapshots of job statistics
134/// and emitting a new concurrency limit
135#[cfg_attr(feature = "async-trait", crate::async_trait)]
136pub trait DynamicDiscardController: Send + Sync + 'static {
137    /// Compute the new threshold for discarding
138    ///
139    /// If you want to utilize metrics exposed in [crate::factory::stats] you can gather them
140    /// by utilizing `stats_facebook::service_data::get_service_data_singleton` to retrieve a
141    /// accessor to `ServiceData` which you can then resolve stats by name (either timeseries or
142    /// counters)
143    ///
144    /// The namespace of stats collected on the base controller factory are
145    /// `base_controller.factory.{FACTORY_NAME}.{STAT}`
146    ///
147    /// If no factory name is set, then "all" will be inserted
148    #[cfg(feature = "async-trait")]
149    async fn compute(&mut self, current_threshold: usize) -> usize;
150
151    /// Compute the new threshold for discarding
152    ///
153    /// If you want to utilize metrics exposed in [crate::factory::stats] you can gather them
154    /// by utilizing `stats_facebook::service_data::get_service_data_singleton` to retrieve a
155    /// accessor to `ServiceData` which you can then resolve stats by name (either timeseries or
156    /// counters)
157    ///
158    /// The namespace of stats collected on the base controller factory are
159    /// `base_controller.factory.{FACTORY_NAME}.{STAT}`
160    ///
161    /// If no factory name is set, then "all" will be inserted
162    #[cfg(not(feature = "async-trait"))]
163    fn compute(&mut self, current_threshold: usize) -> futures::future::BoxFuture<'_, usize>;
164}
165
166/// Reason for discarding a job
167#[derive(Debug, Eq, PartialEq, Clone)]
168pub enum DiscardReason {
169    /// The job TTLd
170    TtlExpired,
171    /// The job was rejected or dropped due to loadshedding
172    Loadshed,
173    /// The job was dropped due to factory shutting down
174    Shutdown,
175    /// The job was rejected due to rate limits being exceeded
176    RateLimited,
177}
178
179/// Trait defining the discard handler for a factory.
180pub trait DiscardHandler<TKey, TMsg>: Send + Sync + 'static
181where
182    TKey: JobKey,
183    TMsg: Message,
184{
185    /// Called on a job prior to being dropped from the factory.
186    ///
187    /// Useful scenarios are (a) collecting metrics, (b) logging, (c) tearing
188    /// down resources in the job, etc.
189    fn discard(&self, reason: DiscardReason, job: &mut Job<TKey, TMsg>);
190}