1use std::collections::HashMap;
9use std::marker::PhantomData;
10
11use crate::factory::worker::WorkerProperties;
12use crate::factory::Job;
13use crate::factory::JobKey;
14use crate::factory::WorkerId;
15use crate::ActorProcessingErr;
16use crate::Message;
17use crate::State;
18
19pub trait CustomHashFunction<TKey>: Send + Sync
21where
22 TKey: Send + Sync + 'static,
23{
24 fn hash(&self, key: &TKey, worker_count: usize) -> usize;
26}
27
28#[derive(Debug)]
30pub enum RouteResult<TKey, TMsg>
31where
32 TKey: JobKey,
33 TMsg: Message,
34{
35 Handled,
37 Backlog(Job<TKey, TMsg>),
40 RateLimited(Job<TKey, TMsg>),
47}
48
49pub trait Router<TKey, TMsg>: State
52where
53 TKey: JobKey,
54 TMsg: Message,
55{
56 fn route_message(
68 &mut self,
69 job: Job<TKey, TMsg>,
70 pool_size: usize,
71 worker_hint: Option<WorkerId>,
72 worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
73 ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr>;
74
75 fn choose_target_worker(
94 &mut self,
95 job: &Job<TKey, TMsg>,
96 pool_size: usize,
97 worker_hint: Option<WorkerId>,
98 worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
99 ) -> Option<WorkerId>;
100
101 fn is_factory_queueing(&self) -> bool;
104}
105
106macro_rules! impl_routing_mode {
108 ($routing_mode: ident, $doc:expr) => {
109 #[doc = $doc]
110 #[derive(Debug)]
111 pub struct $routing_mode<TKey, TMsg>
112 where
113 TKey: JobKey,
114 TMsg: Message,
115 {
116 _key: PhantomData<fn() -> TKey>,
117 _msg: PhantomData<fn() -> TMsg>,
118 }
119
120 impl<TKey, TMsg> Default for $routing_mode<TKey, TMsg>
121 where
122 TKey: JobKey,
123 TMsg: Message,
124 {
125 fn default() -> Self {
126 Self {
127 _key: PhantomData,
128 _msg: PhantomData,
129 }
130 }
131 }
132 };
133}
134
135impl_routing_mode! {KeyPersistentRouting, "Factory will select worker by hashing the job's key.
137Workers will have jobs placed into their incoming message queue's"}
138
139impl<TKey, TMsg> Router<TKey, TMsg> for KeyPersistentRouting<TKey, TMsg>
140where
141 TKey: JobKey,
142 TMsg: Message,
143{
144 fn route_message(
145 &mut self,
146 job: Job<TKey, TMsg>,
147 pool_size: usize,
148 worker_hint: Option<WorkerId>,
149 worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
150 ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
151 if let Some(worker) = self
152 .choose_target_worker(&job, pool_size, worker_hint, worker_pool)
153 .and_then(|wid| worker_pool.get_mut(&wid))
154 {
155 worker.enqueue_job(job)?;
156 }
157 Ok(RouteResult::Handled)
158 }
159
160 fn choose_target_worker(
161 &mut self,
162 job: &Job<TKey, TMsg>,
163 pool_size: usize,
164 worker_hint: Option<WorkerId>,
165 _worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
166 ) -> Option<WorkerId> {
167 let key =
168 worker_hint.unwrap_or_else(|| crate::factory::hash::hash_with_max(&job.key, pool_size));
169 Some(key)
170 }
171
172 fn is_factory_queueing(&self) -> bool {
173 false
174 }
175}
176
177impl_routing_mode! {QueuerRouting, "Factory will dispatch job to first available worker.
179Factory will maintain shared internal queue of messages"}
180
181impl<TKey, TMsg> Router<TKey, TMsg> for QueuerRouting<TKey, TMsg>
182where
183 TKey: JobKey,
184 TMsg: Message,
185{
186 fn route_message(
187 &mut self,
188 job: Job<TKey, TMsg>,
189 pool_size: usize,
190 worker_hint: Option<WorkerId>,
191 worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
192 ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
193 if let Some(worker) = self
194 .choose_target_worker(&job, pool_size, worker_hint, worker_pool)
195 .and_then(|wid| worker_pool.get_mut(&wid))
196 {
197 worker.enqueue_job(job)?;
198 Ok(RouteResult::Handled)
199 } else {
200 Ok(RouteResult::Backlog(job))
201 }
202 }
203
204 fn choose_target_worker(
205 &mut self,
206 _job: &Job<TKey, TMsg>,
207 _pool_size: usize,
208 worker_hint: Option<WorkerId>,
209 worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
210 ) -> Option<WorkerId> {
211 if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
212 if worker.is_available() {
213 return worker_hint;
214 }
215 }
216 worker_pool
217 .iter()
218 .find(|(_, worker)| worker.is_available())
219 .map(|(wid, _)| *wid)
220 }
221
222 fn is_factory_queueing(&self) -> bool {
223 true
224 }
225}
226
227impl_routing_mode! {StickyQueuerRouting, "Factory will dispatch jobs to a worker that is processing the same key (if any).
229Factory will maintain shared internal queue of messages.
230
231Note: This is helpful for sharded db access style scenarios. If a worker is
232currently doing something on a given row id for example, we want subsequent updates
233to land on the same worker so it can serialize updates to the same row consistently."}
234
235impl<TKey, TMsg> Router<TKey, TMsg> for StickyQueuerRouting<TKey, TMsg>
236where
237 TKey: JobKey,
238 TMsg: Message,
239{
240 fn route_message(
241 &mut self,
242 job: Job<TKey, TMsg>,
243 pool_size: usize,
244 worker_hint: Option<WorkerId>,
245 worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
246 ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
247 if let Some(worker) = self
248 .choose_target_worker(&job, pool_size, worker_hint, worker_pool)
249 .and_then(|wid| worker_pool.get_mut(&wid))
250 {
251 worker.enqueue_job(job)?;
252 Ok(RouteResult::Handled)
253 } else {
254 Ok(RouteResult::Backlog(job))
255 }
256 }
257
258 fn choose_target_worker(
259 &mut self,
260 job: &Job<TKey, TMsg>,
261 _pool_size: usize,
262 worker_hint: Option<WorkerId>,
263 worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
264 ) -> Option<WorkerId> {
265 if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
267 if worker.is_processing_key(&job.key) {
268 return worker_hint;
269 }
270 }
271
272 let maybe_worker = worker_pool
273 .iter()
274 .find(|(_, worker)| worker.is_processing_key(&job.key))
275 .map(|(a, _)| *a);
276 if maybe_worker.is_some() {
277 return maybe_worker;
278 }
279
280 if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
282 if worker.is_available() {
283 return worker_hint;
284 }
285 }
286
287 worker_pool
289 .iter()
290 .find(|(_, worker)| worker.is_available())
291 .map(|(wid, _)| *wid)
292 }
293
294 fn is_factory_queueing(&self) -> bool {
295 true
296 }
297}
298
299#[derive(Debug)]
304pub struct RoundRobinRouting<TKey, TMsg>
305where
306 TKey: JobKey,
307 TMsg: Message,
308{
309 _key: PhantomData<fn() -> TKey>,
310 _msg: PhantomData<fn() -> TMsg>,
311 last_worker: WorkerId,
312}
313
314impl<TKey, TMsg> Default for RoundRobinRouting<TKey, TMsg>
315where
316 TKey: JobKey,
317 TMsg: Message,
318{
319 fn default() -> Self {
320 Self {
321 _key: PhantomData,
322 _msg: PhantomData,
323 last_worker: 0,
324 }
325 }
326}
327
328impl<TKey, TMsg> Router<TKey, TMsg> for RoundRobinRouting<TKey, TMsg>
329where
330 TKey: JobKey,
331 TMsg: Message,
332{
333 fn route_message(
334 &mut self,
335 job: Job<TKey, TMsg>,
336 pool_size: usize,
337 worker_hint: Option<WorkerId>,
338 worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
339 ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
340 if let Some(worker) = self
341 .choose_target_worker(&job, pool_size, worker_hint, worker_pool)
342 .and_then(|wid| worker_pool.get_mut(&wid))
343 {
344 worker.enqueue_job(job)?;
345 }
346 Ok(RouteResult::Handled)
347 }
348
349 fn choose_target_worker(
350 &mut self,
351 _job: &Job<TKey, TMsg>,
352 pool_size: usize,
353 worker_hint: Option<WorkerId>,
354 worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
355 ) -> Option<WorkerId> {
356 if let Some(worker) = worker_hint.and_then(|worker| worker_pool.get(&worker)) {
357 if worker.is_available() {
358 return worker_hint;
359 }
360 }
361
362 let mut key = self.last_worker + 1;
363 if key >= pool_size {
364 key = 0;
365 }
366 self.last_worker = key;
367 Some(key)
368 }
369
370 fn is_factory_queueing(&self) -> bool {
371 false
372 }
373}
374
375#[derive(Debug)]
381pub struct CustomRouting<TKey, TMsg, THasher>
382where
383 TKey: JobKey,
384 TMsg: Message,
385 THasher: CustomHashFunction<TKey>,
386{
387 _key: PhantomData<fn() -> TKey>,
388 _msg: PhantomData<fn() -> TMsg>,
389 hasher: THasher,
390}
391
392impl<TKey, TMsg, THasher> CustomRouting<TKey, TMsg, THasher>
393where
394 TKey: JobKey,
395 TMsg: Message,
396 THasher: CustomHashFunction<TKey>,
397{
398 pub fn new(hasher: THasher) -> Self {
400 Self {
401 _key: PhantomData,
402 _msg: PhantomData,
403 hasher,
404 }
405 }
406}
407
408impl<TKey, TMsg, THasher> Router<TKey, TMsg> for CustomRouting<TKey, TMsg, THasher>
409where
410 TKey: JobKey,
411 TMsg: Message,
412 THasher: CustomHashFunction<TKey> + 'static,
413{
414 fn route_message(
415 &mut self,
416 job: Job<TKey, TMsg>,
417 pool_size: usize,
418 worker_hint: Option<WorkerId>,
419 worker_pool: &mut HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
420 ) -> Result<RouteResult<TKey, TMsg>, ActorProcessingErr> {
421 if let Some(worker) = self
422 .choose_target_worker(&job, pool_size, worker_hint, worker_pool)
423 .and_then(|wid| worker_pool.get_mut(&wid))
424 {
425 worker.enqueue_job(job)?;
426 }
427 Ok(RouteResult::Handled)
428 }
429
430 fn choose_target_worker(
431 &mut self,
432 job: &Job<TKey, TMsg>,
433 pool_size: usize,
434 _worker_hint: Option<WorkerId>,
435 _worker_pool: &HashMap<WorkerId, WorkerProperties<TKey, TMsg>>,
436 ) -> Option<WorkerId> {
437 let key = self.hasher.hash(&job.key, pool_size);
438 Some(key)
439 }
440
441 fn is_factory_queueing(&self) -> bool {
442 false
443 }
444}