ractor/factory/
routing.rs

1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Routing protocols for Factories
7
8use std::collections::HashMap;
9use std::marker::PhantomData;
10
11use crate::factory::worker::WorkerProperties;
12use crate::factory::Job;
13use crate::factory::JobKey;
14use crate::factory::WorkerId;
15use crate::ActorProcessingErr;
16use crate::Message;
17use crate::State;
18
19/// Custom hashing behavior for factory routing to workers
20pub trait CustomHashFunction<TKey>: Send + Sync
21where
22    TKey: Send + Sync + 'static,
23{
24    /// Hash the key into the space 0..usize
25    fn hash(&self, key: &TKey, worker_count: usize) -> usize;
26}
27
28/// The possible results from a routing operation.
29#[derive(Debug)]
30pub enum RouteResult<TKey, TMsg>
31where
32    TKey: JobKey,
33    TMsg: Message,
34{
35    /// The job has been handled and routed successfully
36    Handled,
37    /// The job needs to be backlogged into the internal factory's queue (if
38    /// configured)
39    Backlog(Job<TKey, TMsg>),
40    /// The job has exceeded the internal rate limit specification of the router.
41    /// This would be returned as a route operation in the event that the router is
42    /// tracking the jobs-per-unit-time and has decided that routing this next job
43    /// would exceed that limit.
44    ///
45    /// Returns the job that was rejected
46    RateLimited(Job<TKey, TMsg>),
47}
48
49/// A routing mode controls how a request is routed from the factory to a
50/// designated worker
51pub trait Router<TKey, TMsg>: State
52where
53    TKey: JobKey,
54    TMsg: Message,
55{
56    /// Route a [Job] based on the specific routing methodology
57    ///
58    /// * `job` - The job to be routed
59    /// * `pool_size` - The size of the ACTIVE worker pool (excluding draining workers)
60    /// * `worker_hint` - If provided, this is a "hint" at which worker should receive the job,
61    ///   if available.
62    /// * `worker_pool` - The current worker pool, which may contain draining workers
63    ///
64    /// Returns [RouteResult::Handled] if the job was routed successfully, otherwise
65    /// [RouteResult::Backlog] is returned indicating that the job should be enqueued in
66    /// the factory's internal queue.
67    fn route_message(
68        &mut self,
69        job: Job<TKey, TMsg>,
70        pool_size: usize,
71        worker_hint: Option<WorkerId>,
72        worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
73    ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr>;
74
75    /// Identifies if a job CAN be routed, and to which worker, without
76    /// requiring dequeueing the job
77    ///
78    /// This prevents the need to support pushing jobs that have been dequeued,
79    /// but no worker is available to accept the job, back into the front of the
80    /// queue. And given the single-threaded nature of a Factory, this is safe
81    /// to call outside of a locked context. It is assumed that if this returns
82    /// [Some(WorkerId)], then the job is guaranteed to be routed, as internal state to
83    /// the router may be updated.
84    ///
85    ///  * `job` - A reference to the job to be routed
86    /// * `pool_size` - The size of the ACTIVE worker pool (excluding draining workers)
87    /// * `worker_hint` - If provided, this is a "hint" at which worker should receive the job,
88    ///   if available.
89    /// * `worker_pool` - The current worker pool, which may contain draining workers
90    ///
91    /// Returns [None] if no worker can be identified or no worker is avaialble to accept
92    /// the job, otherwise [Some(WorkerId)] indicating the target worker is returned
93    fn choose_target_worker(
94        &mut self,
95        job: &Job<TKey, TMsg>,
96        pool_size: usize,
97        worker_hint: Option<WorkerId>,
98        worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
99    ) -> Option<WorkerId>;
100
101    /// Returns a flag indicating if the factory does discard/overload management ([true])
102    /// or if is handled by the workers worker(s) ([false])
103    fn is_factory_queueing(&self) -> bool;
104}
105
106// ============================ Macros ======================= //
107macro_rules! impl_routing_mode {
108    ($routing_mode: ident, $doc:expr) => {
109        #[doc = $doc]
110        #[derive(Debug)]
111        pub struct $routing_mode<TKey, TMsg>
112        where
113            TKey: JobKey,
114            TMsg: Message,
115        {
116            _key: PhantomData<fn() -> TKey>,
117            _msg: PhantomData<fn() -> TMsg>,
118        }
119
120        impl<TKey, TMsg> Default for $routing_mode<TKey, TMsg>
121        where
122            TKey: JobKey,
123            TMsg: Message,
124        {
125            fn default() -> Self {
126                Self {
127                    _key: PhantomData,
128                    _msg: PhantomData,
129                }
130            }
131        }
132    };
133}
134
135// ============================ Key Persistent routing ======================= //
136impl_routing_mode! {KeyPersistentRouting, "Factory will select worker by hashing the job's key.
137Workers will have jobs placed into their incoming message queue's"}
138
139impl<TKey, TMsg> Router<TKey, TMsg> for KeyPersistentRouting<TKey, TMsg>
140where
141    TKey: JobKey,
142    TMsg: Message,
143{
144    fn route_message(
145        &mut self,
146        job: Job<TKey, TMsg>,
147        pool_size: usize,
148        worker_hint: Option<WorkerId>,
149        worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
150    ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
151        if let Some(worker) = self
152            .choose_target_worker(&job, pool_size, worker_hint, worker_pool)
153            .and_then(|wid| worker_pool.get_mut(&wid))
154        {
155            worker.enqueue_job(job)?;
156        }
157        Ok(RouteResult::Handled)
158    }
159
160    fn choose_target_worker(
161        &mut self,
162        job: &Job<TKey, TMsg>,
163        pool_size: usize,
164        worker_hint: Option<WorkerId>,
165        _worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
166    ) -> Option<WorkerId> {
167        let key =
168            worker_hint.unwrap_or_else(|| crate::factory::hash::hash_with_max(&job.key, pool_size));
169        Some(key)
170    }
171
172    fn is_factory_queueing(&self) -> bool {
173        false
174    }
175}
176
177// ============================ Queuer routing ======================= //
178impl_routing_mode! {QueuerRouting, "Factory will dispatch job to first available worker.
179Factory will maintain shared internal queue of messages"}
180
181impl<TKey, TMsg> Router<TKey, TMsg> for QueuerRouting<TKey, TMsg>
182where
183    TKey: JobKey,
184    TMsg: Message,
185{
186    fn route_message(
187        &mut self,
188        job: Job<TKey, TMsg>,
189        pool_size: usize,
190        worker_hint: Option<WorkerId>,
191        worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
192    ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
193        if let Some(worker) = self
194            .choose_target_worker(&job, pool_size, worker_hint, worker_pool)
195            .and_then(|wid| worker_pool.get_mut(&wid))
196        {
197            worker.enqueue_job(job)?;
198            Ok(RouteResult::Handled)
199        } else {
200            Ok(RouteResult::Backlog(job))
201        }
202    }
203
204    fn choose_target_worker(
205        &mut self,
206        _job: &Job<TKey, TMsg>,
207        _pool_size: usize,
208        worker_hint: Option<WorkerId>,
209        worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
210    ) -> Option<WorkerId> {
211        if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
212            if worker.is_available() {
213                return worker_hint;
214            }
215        }
216        worker_pool
217            .iter()
218            .find(|(_, worker)| worker.is_available())
219            .map(|(wid, _)| *wid)
220    }
221
222    fn is_factory_queueing(&self) -> bool {
223        true
224    }
225}
226
227// ============================ Sticky Queuer routing ======================= //
228impl_routing_mode! {StickyQueuerRouting, "Factory will dispatch jobs to a worker that is processing the same key (if any).
229Factory will maintain shared internal queue of messages.
230
231Note: This is helpful for sharded db access style scenarios. If a worker is
232currently doing something on a given row id for example, we want subsequent updates
233to land on the same worker so it can serialize updates to the same row consistently."}
234
235impl<TKey, TMsg> Router<TKey, TMsg> for StickyQueuerRouting<TKey, TMsg>
236where
237    TKey: JobKey,
238    TMsg: Message,
239{
240    fn route_message(
241        &mut self,
242        job: Job<TKey, TMsg>,
243        pool_size: usize,
244        worker_hint: Option<WorkerId>,
245        worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
246    ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
247        if let Some(worker) = self
248            .choose_target_worker(&job, pool_size, worker_hint, worker_pool)
249            .and_then(|wid| worker_pool.get_mut(&wid))
250        {
251            worker.enqueue_job(job)?;
252            Ok(RouteResult::Handled)
253        } else {
254            Ok(RouteResult::Backlog(job))
255        }
256    }
257
258    fn choose_target_worker(
259        &mut self,
260        job: &Job<TKey, TMsg>,
261        _pool_size: usize,
262        worker_hint: Option<WorkerId>,
263        worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
264    ) -> Option<WorkerId> {
265        // check sticky first
266        if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
267            if worker.is_processing_key(&job.key) {
268                return worker_hint;
269            }
270        }
271
272        let maybe_worker = worker_pool
273            .iter()
274            .find(|(_, worker)| worker.is_processing_key(&job.key))
275            .map(|(a, _)| *a);
276        if maybe_worker.is_some() {
277            return maybe_worker;
278        }
279
280        // now take first available, based on hint then brute-search
281        if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
282            if worker.is_available() {
283                return worker_hint;
284            }
285        }
286
287        // fallback to first free worker as there's no sticky worker
288        worker_pool
289            .iter()
290            .find(|(_, worker)| worker.is_available())
291            .map(|(wid, _)| *wid)
292    }
293
294    fn is_factory_queueing(&self) -> bool {
295        true
296    }
297}
298
299// ============================ Round-robin routing ======================= //
300/// Factory will dispatch to the next worker in order.
301///
302/// Workers will have jobs placed into their incoming message queue's
303#[derive(Debug)]
304pub struct RoundRobinRouting<TKey, TMsg>
305where
306    TKey: JobKey,
307    TMsg: Message,
308{
309    _key: PhantomData<fn() -> TKey>,
310    _msg: PhantomData<fn() -> TMsg>,
311    last_worker: WorkerId,
312}
313
314impl<TKey, TMsg> Default for RoundRobinRouting<TKey, TMsg>
315where
316    TKey: JobKey,
317    TMsg: Message,
318{
319    fn default() -> Self {
320        Self {
321            _key: PhantomData,
322            _msg: PhantomData,
323            last_worker: 0,
324        }
325    }
326}
327
328impl<TKey, TMsg> Router<TKey, TMsg> for RoundRobinRouting<TKey, TMsg>
329where
330    TKey: JobKey,
331    TMsg: Message,
332{
333    fn route_message(
334        &mut self,
335        job: Job<TKey, TMsg>,
336        pool_size: usize,
337        worker_hint: Option<WorkerId>,
338        worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
339    ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
340        if let Some(worker) = self
341            .choose_target_worker(&job, pool_size, worker_hint, worker_pool)
342            .and_then(|wid| worker_pool.get_mut(&wid))
343        {
344            worker.enqueue_job(job)?;
345        }
346        Ok(RouteResult::Handled)
347    }
348
349    fn choose_target_worker(
350        &mut self,
351        _job: &Job<TKey, TMsg>,
352        pool_size: usize,
353        worker_hint: Option<WorkerId>,
354        worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
355    ) -> Option<WorkerId> {
356        if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
357            if worker.is_available() {
358                return worker_hint;
359            }
360        }
361
362        let mut key = self.last_worker + 1;
363        if key >= pool_size {
364            key = 0;
365        }
366        self.last_worker = key;
367        Some(key)
368    }
369
370    fn is_factory_queueing(&self) -> bool {
371        false
372    }
373}
374
375// ============================ Custom routing ======================= //
376/// Factory will dispatch to workers based on a custom hash function.
377///
378/// The factory maintains no queue in this scenario, and jobs are pushed
379/// to worker's queues.
380#[derive(Debug)]
381pub struct CustomRouting<TKey, TMsg, THasher>
382where
383    TKey: JobKey,
384    TMsg: Message,
385    THasher: CustomHashFunction<TKey>,
386{
387    _key: PhantomData<fn() -> TKey>,
388    _msg: PhantomData<fn() -> TMsg>,
389    hasher: THasher,
390}
391
392impl<TKey, TMsg, THasher> CustomRouting<TKey, TMsg, THasher>
393where
394    TKey: JobKey,
395    TMsg: Message,
396    THasher: CustomHashFunction<TKey>,
397{
398    /// Construct a new [CustomRouting] instance with the supplied hash function
399    pub fn new(hasher: THasher) -> Self {
400        Self {
401            _key: PhantomData,
402            _msg: PhantomData,
403            hasher,
404        }
405    }
406}
407
408impl<TKey, TMsg, THasher> Router<TKey, TMsg> for CustomRouting<TKey, TMsg, THasher>
409where
410    TKey: JobKey,
411    TMsg: Message,
412    THasher: CustomHashFunction<TKey> + 'static,
413{
414    fn route_message(
415        &mut self,
416        job: Job<TKey, TMsg>,
417        pool_size: usize,
418        worker_hint: Option<WorkerId>,
419        worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
420    ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
421        if let Some(worker) = self
422            .choose_target_worker(&job, pool_size, worker_hint, worker_pool)
423            .and_then(|wid| worker_pool.get_mut(&wid))
424        {
425            worker.enqueue_job(job)?;
426        }
427        Ok(RouteResult::Handled)
428    }
429
430    fn choose_target_worker(
431        &mut self,
432        job: &Job<TKey, TMsg>,
433        pool_size: usize,
434        _worker_hint: Option<WorkerId>,
435        _worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
436    ) -> Option<WorkerId> {
437        let key = self.hasher.hash(&job.key, pool_size);
438        Some(key)
439    }
440
441    fn is_factory_queueing(&self) -> bool {
442        false
443    }
444}