ractor/factory/
worker.rs

1// Copyright (c) Sean Lawlor
2//
3// This source code is licensed under both the MIT license found in the
4// LICENSE-MIT file in the root directory of this source tree.
5
6//! Factory worker properties
7
8use std::collections::HashMap;
9use std::collections::VecDeque;
10use std::fmt::Debug;
11#[cfg(not(feature = "async-trait"))]
12use std::future::Future;
13use std::sync::Arc;
14
15use bon::Builder;
16use tracing::Instrument;
17
18use super::discard::DiscardMode;
19use super::discard::WorkerDiscardSettings;
20use super::stats::FactoryStatsLayer;
21use super::DiscardHandler;
22use super::DiscardReason;
23use super::FactoryMessage;
24use super::Job;
25use super::JobKey;
26use super::JobOptions;
27use super::WorkerId;
28use crate::concurrency::Duration;
29use crate::concurrency::Instant;
30use crate::concurrency::JoinHandle;
31use crate::Actor;
32use crate::ActorCell;
33use crate::ActorId;
34use crate::ActorProcessingErr;
35use crate::ActorRef;
36use crate::Message;
37use crate::MessagingErr;
38use crate::SupervisionEvent;
39
40/// The configuration for the dead-man's switch functionality
41#[derive(Builder, Debug)]
42pub struct DeadMansSwitchConfiguration {
43    /// Duration before determining worker is stuck
44    pub detection_timeout: Duration,
45    /// Flag denoting if the stuck worker should be killed
46    /// and restarted
47    ///
48    /// Default = [true]
49    #[builder(default = true)]
50    pub kill_worker: bool,
51}
52
53/// A factory worker trait, which is a basic wrapper around
54/// actor logic, with predefined type information specific to workers
55///
56/// IMPORTANT: Workers are actors at their core principal, but with
57/// somewhat customized logic. This logic assists in tracking worker health,
58/// processing messages in a load-balanced manner, and managing necessary
59/// start automatically without copying the code repeatedly.
60///
61/// This trait implements as much of the custom wrapping logic as possible
62/// without breaking the factory <-> worker API requirement. If you so wish
63/// you can fully specify the actor properties instead of using this
64/// assistance trait.
65#[cfg_attr(feature = "async-trait", crate::async_trait)]
66pub trait Worker: Send + Sync + 'static {
67    /// The worker's job-key type
68    type Key: JobKey;
69    /// The worker's message type
70    type Message: Message;
71    /// The optional startup arguments for the worker (use `()` to ignore)
72    type Arguments: Message;
73    /// The worker's internal state
74    type State: crate::State;
75
76    /// Invoked when a worker is being started by the system.
77    ///
78    /// Any initialization inherent to the actor's role should be
79    /// performed here hence why it returns the initial state.
80    ///
81    /// Panics in `pre_start` do not invoke the
82    /// supervision strategy and the actor won't be started. The `spawn`
83    /// will return an error to the caller
84    ///
85    /// * `wid` - The id of this worker in the factory
86    /// * `factory` - The handle to the factory that owns and manages this worker
87    /// * `args` - Arguments that are passed in the spawning of the worker which are
88    ///   necessary to construct the initial state
89    ///
90    /// Returns an initial [Worker::State] to bootstrap the actor
91    #[cfg(not(feature = "async-trait"))]
92    fn pre_start(
93        &self,
94        wid: WorkerId,
95        factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
96        args: Self::Arguments,
97    ) -> impl Future<Output = Result<Self::State, ActorProcessingErr>> + Send;
98
99    /// Invoked when a worker is being started by the system.
100    ///
101    /// Any initialization inherent to the actor's role should be
102    /// performed here hence why it returns the initial state.
103    ///
104    /// Panics in `pre_start` do not invoke the
105    /// supervision strategy and the actor won't be started. The `spawn`
106    /// will return an error to the caller
107    ///
108    /// * `wid` - The id of this worker in the factory
109    /// * `factory` - The handle to the factory that owns and manages this worker
110    /// * `args` - Arguments that are passed in the spawning of the worker which are
111    /// necessary to construct the initial state
112    ///
113    /// Returns an initial [Worker::State] to bootstrap the actor
114    #[cfg(feature = "async-trait")]
115    async fn pre_start(
116        &self,
117        wid: WorkerId,
118        factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
119        args: Self::Arguments,
120    ) -> Result<Self::State, ActorProcessingErr>;
121
122    /// Invoked after an actor has started.
123    ///
124    /// Any post initialization can be performed here, such as writing
125    /// to a log file, emitting metrics.
126    ///
127    /// * `wid` - The id of this worker in the factory
128    /// * `factory` - The handle to the factory that owns and manages this worker
129    /// * `state` - The worker's internal state, which is mutable and owned by the worker
130    ///
131    /// Panics in `post_start` follow the supervision strategy.
132    #[allow(unused_variables)]
133    #[cfg(not(feature = "async-trait"))]
134    fn post_start(
135        &self,
136        wid: WorkerId,
137        factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
138        state: &mut Self::State,
139    ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
140        async { Ok(()) }
141    }
142    /// Invoked after an actor has started.
143    ///
144    /// Any post initialization can be performed here, such as writing
145    /// to a log file, emitting metrics.
146    ///
147    /// * `wid` - The id of this worker in the factory
148    /// * `factory` - The handle to the factory that owns and manages this worker
149    /// * `state` - The worker's internal state, which is mutable and owned by the worker
150    ///
151    /// Panics in `post_start` follow the supervision strategy.
152    #[allow(unused_variables)]
153    #[cfg(feature = "async-trait")]
154    async fn post_start(
155        &self,
156        wid: WorkerId,
157        factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
158        state: &mut Self::State,
159    ) -> Result<(), ActorProcessingErr> {
160        Ok(())
161    }
162
163    /// Invoked after an actor has been stopped to perform final cleanup. In the
164    /// event the actor is terminated with killed or has self-panicked,
165    /// `post_stop` won't be called.
166    ///
167    /// * `wid` - The id of this worker in the factory
168    /// * `factory` - The handle to the factory that owns and manages this worker
169    /// * `state` - The worker's internal state, which is mutable and owned by the worker
170    ///
171    /// Panics in `post_stop` follow the supervision strategy.
172    #[allow(unused_variables)]
173    #[cfg(not(feature = "async-trait"))]
174    fn post_stop(
175        &self,
176        wid: WorkerId,
177        factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
178        state: &mut Self::State,
179    ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
180        async { Ok(()) }
181    }
182    /// Invoked after an actor has been stopped to perform final cleanup. In the
183    /// event the actor is terminated with killed or has self-panicked,
184    /// `post_stop` won't be called.
185    ///
186    /// * `wid` - The id of this worker in the factory
187    /// * `factory` - The handle to the factory that owns and manages this worker
188    /// * `state` - The worker's internal state, which is mutable and owned by the worker
189    ///
190    /// Panics in `post_stop` follow the supervision strategy.
191    #[allow(unused_variables)]
192    #[cfg(feature = "async-trait")]
193    async fn post_stop(
194        &self,
195        wid: WorkerId,
196        factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
197        state: &mut Self::State,
198    ) -> Result<(), ActorProcessingErr> {
199        Ok(())
200    }
201
202    /// Handle the incoming message from the event processing loop. Unhandled panickes will be
203    /// captured and sent to the supervisor(s)
204    ///
205    /// * `wid` - The id of this worker in the factory
206    /// * `factory` - The handle to the factory that owns and manages this worker
207    /// * `job` - The [Job] which this worker should process
208    /// * `state` - The worker's internal state, which is mutable and owned by the worker
209    ///
210    /// Returns the [Job::key] upon success or the error on failure
211    #[allow(unused_variables)]
212    #[cfg(not(feature = "async-trait"))]
213    fn handle(
214        &self,
215        wid: WorkerId,
216        factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
217        job: Job<Self::Key, Self::Message>,
218        state: &mut Self::State,
219    ) -> impl Future<Output = Result<Self::Key, ActorProcessingErr>> + Send {
220        async { Ok(job.key) }
221    }
222
223    /// Handle the incoming message from the event processing loop. Unhandled panickes will be
224    /// captured and sent to the supervisor(s)
225    ///
226    /// * `wid` - The id of this worker in the factory
227    /// * `factory` - The handle to the factory that owns and manages this worker
228    /// * `job` - The [Job] which this worker should process
229    /// * `state` - The worker's internal state, which is mutable and owned by the worker
230    ///
231    /// Returns the [Job::key] upon success or the error on failure
232    #[allow(unused_variables)]
233    #[cfg(feature = "async-trait")]
234    async fn handle(
235        &self,
236        wid: WorkerId,
237        factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
238        job: Job<Self::Key, Self::Message>,
239        state: &mut Self::State,
240    ) -> Result<Self::Key, ActorProcessingErr> {
241        Ok(job.key)
242    }
243
244    /// Handle the incoming supervision event. Unhandled panics will be captured and
245    /// sent the the supervisor(s). The default supervision behavior is to exit the
246    /// supervisor on any child exit. To override this behavior, implement this function.
247    ///
248    /// * `myself` - A reference to this actor's ActorCell
249    /// * `message` - The message to process
250    /// * `state` - A mutable reference to the internal actor's state
251    #[allow(unused_variables)]
252    #[cfg(not(feature = "async-trait"))]
253    fn handle_supervisor_evt(
254        &self,
255        myself: ActorCell,
256        message: SupervisionEvent,
257        state: &mut Self::State,
258    ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
259        async move {
260            match message {
261                SupervisionEvent::ActorTerminated(who, _, _)
262                | SupervisionEvent::ActorFailed(who, _) => {
263                    myself.stop(None);
264                }
265                _ => {}
266            }
267            Ok(())
268        }
269    }
270    /// Handle the incoming supervision event. Unhandled panics will be captured and
271    /// sent the the supervisor(s). The default supervision behavior is to exit the
272    /// supervisor on any child exit. To override this behavior, implement this function.
273    ///
274    /// * `myself` - A reference to this actor's ActorCell
275    /// * `message` - The message to process
276    /// * `state` - A mutable reference to the internal actor's state
277    #[allow(unused_variables)]
278    #[cfg(feature = "async-trait")]
279    async fn handle_supervisor_evt(
280        &self,
281        myself: ActorCell,
282        message: SupervisionEvent,
283        state: &mut Self::State,
284    ) -> Result<(), ActorProcessingErr> {
285        match message {
286            SupervisionEvent::ActorTerminated(who, _, _)
287            | SupervisionEvent::ActorFailed(who, _) => {
288                myself.stop(None);
289            }
290            _ => {}
291        }
292        Ok(())
293    }
294}
295
296/// The inner state of the wrapped [Worker] but held privately in trust
297/// for the [Worker] implementation
298#[doc(hidden)]
299pub struct WorkerState<TWorker: Worker> {
300    factory: ActorRef<FactoryMessage<TWorker::Key, TWorker::Message>>,
301    wid: WorkerId,
302    state: TWorker::State,
303}
304
305impl<TWorker: Worker> std::fmt::Debug for WorkerState<TWorker> {
306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307        write!(f, "WorkerState")
308    }
309}
310
311#[cfg_attr(feature = "async-trait", crate::async_trait)]
312impl<T> Actor for T
313where
314    T: Worker,
315{
316    type Msg = WorkerMessage<<Self as Worker>::Key, <Self as Worker>::Message>;
317    type Arguments = WorkerStartContext<
318        <Self as Worker>::Key,
319        <Self as Worker>::Message,
320        <Self as Worker>::Arguments,
321    >;
322    type State = WorkerState<Self>;
323
324    async fn pre_start(
325        &self,
326        _: ActorRef<Self::Msg>,
327        WorkerStartContext {
328            wid,
329            factory,
330            custom_start,
331        }: Self::Arguments,
332    ) -> Result<Self::State, ActorProcessingErr> {
333        let inner_state = <Self as Worker>::pre_start(self, wid, &factory, custom_start).await?;
334        Ok(Self::State {
335            wid,
336            factory,
337            state: inner_state,
338        })
339    }
340
341    async fn post_start(
342        &self,
343        _: ActorRef<Self::Msg>,
344        state: &mut Self::State,
345    ) -> Result<(), ActorProcessingErr> {
346        <Self as Worker>::post_start(self, state.wid, &state.factory, &mut state.state).await
347    }
348
349    async fn post_stop(
350        &self,
351        _: ActorRef<Self::Msg>,
352        state: &mut Self::State,
353    ) -> Result<(), ActorProcessingErr> {
354        <Self as Worker>::post_stop(self, state.wid, &state.factory, &mut state.state).await
355    }
356
357    async fn handle(
358        &self,
359        _: ActorRef<Self::Msg>,
360        message: Self::Msg,
361        state: &mut Self::State,
362    ) -> Result<(), ActorProcessingErr> {
363        match message {
364            WorkerMessage::FactoryPing(time) => {
365                tracing::trace!("Worker {} - ping", state.wid);
366
367                state
368                    .factory
369                    .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
370                Ok(())
371            }
372            WorkerMessage::Dispatch(mut job) => {
373                let key = if let Some(span) = job.options.take_span() {
374                    <Self as Worker>::handle(self, state.wid, &state.factory, job, &mut state.state)
375                        .instrument(span)
376                        .await
377                } else {
378                    <Self as Worker>::handle(self, state.wid, &state.factory, job, &mut state.state)
379                        .await
380                }?;
381                state
382                    .factory
383                    .cast(FactoryMessage::Finished(state.wid, key))?;
384                Ok(())
385            }
386        }
387    }
388
389    async fn handle_supervisor_evt(
390        &self,
391        myself: ActorRef<Self::Msg>,
392        message: SupervisionEvent,
393        state: &mut Self::State,
394    ) -> Result<(), ActorProcessingErr> {
395        <Self as Worker>::handle_supervisor_evt(self, myself.into(), message, &mut state.state)
396            .await
397    }
398}
399
400/// The [super::Factory] is responsible for spawning workers
401/// and re-spawning workers under failure scenarios. This means that
402/// it needs to understand how to build workers. The [WorkerBuilder]
403/// trait is used by the factory to construct new workers when needed.
404pub trait WorkerBuilder<TWorker, TWorkerStart>: Send + Sync
405where
406    TWorker: Actor,
407    TWorkerStart: Message,
408{
409    /// Build a new worker
410    ///
411    /// * `wid`: The worker's "id" or index in the worker pool
412    ///
413    /// Returns a tuple of the worker and a custom startup definition giving the worker
414    /// owned control of some structs that it may need to work.
415    fn build(&mut self, wid: WorkerId) -> (TWorker, TWorkerStart);
416}
417
418/// Controls the size of the worker pool by dynamically growing/shrinking the pool
419/// to requested size
420#[cfg_attr(feature = "async-trait", crate::async_trait)]
421pub trait WorkerCapacityController: 'static + Send + Sync {
422    /// Retrieve the new pool size
423    ///
424    /// * `current` - The current pool size
425    ///
426    /// Returns the "new" pool size. If returns 0, adjustment will be
427    /// ignored
428    #[cfg(feature = "async-trait")]
429    async fn get_pool_size(&mut self, current: usize) -> usize;
430
431    /// Retrieve the new pool size
432    ///
433    /// * `current` - The current pool size
434    ///
435    /// Returns the "new" pool size. If returns 0, adjustment will be
436    /// ignored
437    #[cfg(not(feature = "async-trait"))]
438    fn get_pool_size(&mut self, current: usize) -> futures::future::BoxFuture<'_, usize>;
439}
440
441/// Message to a worker
442#[derive(Debug)]
443pub enum WorkerMessage<TKey, TMsg>
444where
445    TKey: JobKey,
446    TMsg: Message,
447{
448    /// A ping from the factory. The worker should send a [super::FactoryMessage::WorkerPong] reply
449    /// as soon as received back to the [super::Factory], forwarding this instant value to
450    /// track timing information.
451    FactoryPing(Instant),
452    /// A job is dispatched to the worker. Once the worker is complete with processing, it should
453    /// reply with [super::FactoryMessage::Finished] to the [super::Factory] supplying it's
454    /// WID and the job key to signify that the job is completed processing and the worker is
455    /// available for a new job
456    Dispatch(Job<TKey, TMsg>),
457}
458
459#[cfg(feature = "cluster")]
460impl<TKey, TMsg> crate::Message for WorkerMessage<TKey, TMsg>
461where
462    TKey: JobKey,
463    TMsg: Message,
464{
465}
466
467/// Startup context data (`Arguments`) which are passed to a worker on start
468#[derive(Debug)]
469pub struct WorkerStartContext<TKey, TMsg, TCustomStart>
470where
471    TKey: JobKey,
472    TMsg: Message,
473    TCustomStart: Message,
474{
475    /// The worker's identifier
476    pub wid: WorkerId,
477
478    /// The factory the worker belongs to
479    pub factory: ActorRef<FactoryMessage<TKey, TMsg>>,
480
481    /// Custom startup arguments to the worker
482    pub custom_start: TCustomStart,
483}
484
485/// Properties of a worker
486pub struct WorkerProperties<TKey, TMsg>
487where
488    TKey: JobKey,
489    TMsg: Message,
490{
491    /// Worker identifier
492    pub(crate) wid: WorkerId,
493
494    /// Worker actor
495    pub(crate) actor: ActorRef<WorkerMessage<TKey, TMsg>>,
496
497    /// Name of the factory that owns this worker
498    factory_name: String,
499
500    /// The join handle for the worker
501    handle: Option<JoinHandle<()>>,
502
503    /// Worker's message queue
504    message_queue: VecDeque<Job<TKey, TMsg>>,
505
506    /// Maximum queue length. Any job arriving when the queue is at its max length
507    /// will cause an oldest job at the head of the queue will be dropped.
508    ///
509    /// Default is [WorkerDiscardSettings::None]
510    pub(crate) discard_settings: WorkerDiscardSettings,
511
512    /// A function to be called for each job to be dropped.
513    pub(crate) discard_handler: Option<Arc<dyn DiscardHandler<TKey, TMsg>>>,
514
515    /// Flag indicating if this worker has a ping currently pending
516    is_ping_pending: bool,
517
518    /// Time the last ping went out to the worker to track ping metrics
519    last_ping: Instant,
520
521    /// Statistics for the worker
522    stats: Option<Arc<dyn FactoryStatsLayer>>,
523
524    /// Current pending jobs dispatched to the worker (for tracking stats)
525    curr_jobs: HashMap<TKey, JobOptions>,
526
527    /// Flag indicating if this worker is currently "draining" work due to resizing
528    pub(crate) is_draining: bool,
529}
530
531impl<TKey, TMsg> Debug for WorkerProperties<TKey, TMsg>
532where
533    TKey: JobKey,
534    TMsg: Message,
535{
536    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
537        f.debug_struct("WorkerProperties")
538            .field("wid", &self.wid)
539            .field("actor", &self.actor)
540            .field("factory_name", &self.factory_name)
541            .field("discard_settings", &self.discard_settings)
542            .field("is_draining", &self.is_draining)
543            .finish()
544    }
545}
546
547impl<TKey, TMsg> WorkerProperties<TKey, TMsg>
548where
549    TKey: JobKey,
550    TMsg: Message,
551{
552    fn get_next_non_expired_job(&mut self) -> Option<Job<TKey, TMsg>> {
553        while let Some(mut job) = self.message_queue.pop_front() {
554            if !job.is_expired() {
555                return Some(job);
556            } else {
557                if let Some(handler) = &self.discard_handler {
558                    handler.discard(DiscardReason::TtlExpired, &mut job);
559                }
560                self.stats.job_ttl_expired(&self.factory_name, 1);
561            }
562        }
563        None
564    }
565
566    pub(crate) fn new(
567        factory_name: String,
568        wid: WorkerId,
569        actor: ActorRef<WorkerMessage<TKey, TMsg>>,
570        discard_settings: WorkerDiscardSettings,
571        discard_handler: Option<Arc<dyn DiscardHandler<TKey, TMsg>>>,
572        handle: JoinHandle<()>,
573        stats: Option<Arc<dyn FactoryStatsLayer>>,
574    ) -> Self {
575        Self {
576            factory_name,
577            actor,
578            discard_settings,
579            discard_handler,
580            message_queue: VecDeque::new(),
581            curr_jobs: HashMap::new(),
582            wid,
583            is_ping_pending: false,
584            stats,
585            handle: Some(handle),
586            is_draining: false,
587            last_ping: Instant::now(),
588        }
589    }
590
591    pub(crate) fn get_join_handle(&mut self) -> Option<JoinHandle<()>> {
592        self.handle.take()
593    }
594
595    pub(crate) fn is_pid(&self, pid: ActorId) -> bool {
596        self.actor.get_id() == pid
597    }
598
599    /// Identifies if a worker is processing a specific job key
600    ///
601    /// Returns true if the worker is currently processing the given key
602    pub fn is_processing_key(&self, key: &TKey) -> bool {
603        self.curr_jobs.contains_key(key)
604    }
605
606    pub(crate) fn replace_worker(
607        &mut self,
608        nworker: ActorRef<WorkerMessage<TKey, TMsg>>,
609        handle: JoinHandle<()>,
610    ) -> Result<(), ActorProcessingErr> {
611        // these jobs are now "lost" as the worker is going to be killed
612        self.is_ping_pending = false;
613        self.last_ping = Instant::now();
614        self.curr_jobs.clear();
615
616        self.actor = nworker;
617        self.handle = Some(handle);
618        if let Some(mut job) = self.get_next_non_expired_job() {
619            self.curr_jobs.insert(job.key.clone(), job.options.clone());
620            job.set_worker_time();
621            self.actor.cast(WorkerMessage::Dispatch(job))?;
622        }
623        Ok(())
624    }
625
626    /// Identify if the worker is available for enqueueing work
627    pub fn is_available(&self) -> bool {
628        self.curr_jobs.is_empty() && self.message_queue.is_empty()
629    }
630
631    /// Identify if the worker is currently processing any requests
632    pub fn is_working(&self) -> bool {
633        !self.is_available()
634    }
635
636    /// Denotes if the worker is stuck (i.e. unable to complete it's current job)
637    pub(crate) fn is_stuck(&self, duration: Duration) -> bool {
638        if Instant::now() - self.last_ping > duration {
639            let key_strings = self
640                .curr_jobs
641                .keys()
642                .cloned()
643                .fold(String::new(), |a, key| format!("{a}\nJob key: {key:?}"));
644            tracing::warn!("Stuck worker: {}. Last jobs:\n{key_strings}", self.wid);
645            true
646        } else {
647            false
648        }
649    }
650
651    /// Enqueue a new job to this worker. If the discard threshold has been exceeded
652    /// it will discard the oldest or newest elements from the message queue (based
653    /// on discard semantics)
654    pub fn enqueue_job(
655        &mut self,
656        mut job: Job<TKey, TMsg>,
657    ) -> Result<(), Box<MessagingErr<WorkerMessage<TKey, TMsg>>>> {
658        // track per-job statistics
659        self.stats.new_job(&self.factory_name);
660
661        if let Some((limit, DiscardMode::Newest)) = self.discard_settings.get_limit_and_mode() {
662            if limit > 0 && self.message_queue.len() >= limit {
663                // Discard THIS job as it's the newest one
664                self.stats.job_discarded(&self.factory_name);
665                if let Some(handler) = &self.discard_handler {
666                    handler.discard(DiscardReason::Loadshed, &mut job);
667                }
668                job.reject();
669                return Ok(());
670            }
671        }
672
673        // if the job isn't front-load shedded, it's "accepted"
674        job.accept();
675        if self.curr_jobs.is_empty() {
676            self.curr_jobs.insert(job.key.clone(), job.options.clone());
677            if let Some(mut older_job) = self.get_next_non_expired_job() {
678                self.message_queue.push_back(job);
679                older_job.set_worker_time();
680                self.actor.cast(WorkerMessage::Dispatch(older_job))?;
681            } else {
682                job.set_worker_time();
683                self.actor.cast(WorkerMessage::Dispatch(job))?;
684            }
685            return Ok(());
686        }
687        self.message_queue.push_back(job);
688
689        if let Some((limit, DiscardMode::Oldest)) = self.discard_settings.get_limit_and_mode() {
690            // load-shed the OLDEST jobs
691            while limit > 0 && self.message_queue.len() > limit {
692                if let Some(mut discarded) = self.get_next_non_expired_job() {
693                    self.stats.job_discarded(&self.factory_name);
694                    if let Some(handler) = &self.discard_handler {
695                        handler.discard(DiscardReason::Loadshed, &mut discarded);
696                    }
697                }
698            }
699        }
700        Ok(())
701    }
702
703    /// Send a ping to the worker
704    pub(crate) fn send_factory_ping(
705        &mut self,
706    ) -> Result<(), Box<MessagingErr<WorkerMessage<TKey, TMsg>>>> {
707        if !self.is_ping_pending {
708            self.is_ping_pending = true;
709            Ok(self
710                .actor
711                .cast(WorkerMessage::FactoryPing(Instant::now()))?)
712        } else {
713            // don't send a new ping if one is currently pending
714            Ok(())
715        }
716    }
717
718    /// Comes back when a ping went out
719    pub(crate) fn ping_received(&mut self, time: Duration, discard_limit: usize) {
720        self.discard_settings.update_worker_limit(discard_limit);
721        self.stats.worker_ping_received(&self.factory_name, time);
722        self.is_ping_pending = false;
723    }
724
725    /// Called when the factory is notified a worker completed a job. Will push the next message
726    /// if there is any messages in this worker's queue
727    pub(crate) fn worker_complete(
728        &mut self,
729        key: TKey,
730    ) -> Result<Option<JobOptions>, Box<MessagingErr<WorkerMessage<TKey, TMsg>>>> {
731        // remove this pending job
732        let options = self.curr_jobs.remove(&key);
733        // maybe queue up the next job
734        if let Some(mut job) = self.get_next_non_expired_job() {
735            self.curr_jobs.insert(job.key.clone(), job.options.clone());
736            job.set_worker_time();
737            self.actor.cast(WorkerMessage::Dispatch(job))?;
738        }
739
740        Ok(options)
741    }
742
743    /// Set the draining status of the worker
744    pub(crate) fn set_draining(&mut self, is_draining: bool) {
745        self.is_draining = is_draining;
746    }
747}