1use std::collections::HashMap;
9use std::collections::VecDeque;
10use std::fmt::Debug;
11#[cfg(not(feature = "async-trait"))]
12use std::future::Future;
13use std::sync::Arc;
14
15use bon::Builder;
16use tracing::Instrument;
17
18use super::discard::DiscardMode;
19use super::discard::WorkerDiscardSettings;
20use super::stats::FactoryStatsLayer;
21use super::DiscardHandler;
22use super::DiscardReason;
23use super::FactoryMessage;
24use super::Job;
25use super::JobKey;
26use super::JobOptions;
27use super::WorkerId;
28use crate::concurrency::Duration;
29use crate::concurrency::Instant;
30use crate::concurrency::JoinHandle;
31use crate::Actor;
32use crate::ActorCell;
33use crate::ActorId;
34use crate::ActorProcessingErr;
35use crate::ActorRef;
36use crate::Message;
37use crate::MessagingErr;
38use crate::SupervisionEvent;
39
40#[derive(Builder, Debug)]
42pub struct DeadMansSwitchConfiguration {
43 pub detection_timeout: Duration,
45 #[builder(default = true)]
50 pub kill_worker: bool,
51}
52
53#[cfg_attr(feature = "async-trait", crate::async_trait)]
66pub trait Worker: Send + Sync + 'static {
67 type Key: JobKey;
69 type Message: Message;
71 type Arguments: Message;
73 type State: crate::State;
75
76 #[cfg(not(feature = "async-trait"))]
92 fn pre_start(
93 &self,
94 wid: WorkerId,
95 factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
96 args: Self::Arguments,
97 ) -> impl Future<Output = Result<Self::State, ActorProcessingErr>> + Send;
98
99 #[cfg(feature = "async-trait")]
115 async fn pre_start(
116 &self,
117 wid: WorkerId,
118 factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
119 args: Self::Arguments,
120 ) -> Result<Self::State, ActorProcessingErr>;
121
122 #[allow(unused_variables)]
133 #[cfg(not(feature = "async-trait"))]
134 fn post_start(
135 &self,
136 wid: WorkerId,
137 factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
138 state: &mut Self::State,
139 ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
140 async { Ok(()) }
141 }
142 #[allow(unused_variables)]
153 #[cfg(feature = "async-trait")]
154 async fn post_start(
155 &self,
156 wid: WorkerId,
157 factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
158 state: &mut Self::State,
159 ) -> Result<(), ActorProcessingErr> {
160 Ok(())
161 }
162
163 #[allow(unused_variables)]
173 #[cfg(not(feature = "async-trait"))]
174 fn post_stop(
175 &self,
176 wid: WorkerId,
177 factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
178 state: &mut Self::State,
179 ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
180 async { Ok(()) }
181 }
182 #[allow(unused_variables)]
192 #[cfg(feature = "async-trait")]
193 async fn post_stop(
194 &self,
195 wid: WorkerId,
196 factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
197 state: &mut Self::State,
198 ) -> Result<(), ActorProcessingErr> {
199 Ok(())
200 }
201
202 #[allow(unused_variables)]
212 #[cfg(not(feature = "async-trait"))]
213 fn handle(
214 &self,
215 wid: WorkerId,
216 factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
217 job: Job<Self::Key, Self::Message>,
218 state: &mut Self::State,
219 ) -> impl Future<Output = Result<Self::Key, ActorProcessingErr>> + Send {
220 async { Ok(job.key) }
221 }
222
223 #[allow(unused_variables)]
233 #[cfg(feature = "async-trait")]
234 async fn handle(
235 &self,
236 wid: WorkerId,
237 factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
238 job: Job<Self::Key, Self::Message>,
239 state: &mut Self::State,
240 ) -> Result<Self::Key, ActorProcessingErr> {
241 Ok(job.key)
242 }
243
244 #[allow(unused_variables)]
252 #[cfg(not(feature = "async-trait"))]
253 fn handle_supervisor_evt(
254 &self,
255 myself: ActorCell,
256 message: SupervisionEvent,
257 state: &mut Self::State,
258 ) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
259 async move {
260 match message {
261 SupervisionEvent::ActorTerminated(who, _, _)
262 | SupervisionEvent::ActorFailed(who, _) => {
263 myself.stop(None);
264 }
265 _ => {}
266 }
267 Ok(())
268 }
269 }
270 #[allow(unused_variables)]
278 #[cfg(feature = "async-trait")]
279 async fn handle_supervisor_evt(
280 &self,
281 myself: ActorCell,
282 message: SupervisionEvent,
283 state: &mut Self::State,
284 ) -> Result<(), ActorProcessingErr> {
285 match message {
286 SupervisionEvent::ActorTerminated(who, _, _)
287 | SupervisionEvent::ActorFailed(who, _) => {
288 myself.stop(None);
289 }
290 _ => {}
291 }
292 Ok(())
293 }
294}
295
296#[doc(hidden)]
299pub struct WorkerState<TWorker: Worker> {
300 factory: ActorRef<FactoryMessage<TWorker::Key, TWorker::Message>>,
301 wid: WorkerId,
302 state: TWorker::State,
303}
304
305impl<TWorker: Worker> std::fmt::Debug for WorkerState<TWorker> {
306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307 write!(f, "WorkerState")
308 }
309}
310
311#[cfg_attr(feature = "async-trait", crate::async_trait)]
312impl<T> Actor for T
313where
314 T: Worker,
315{
316 type Msg = WorkerMessage<<Self as Worker>::Key, <Self as Worker>::Message>;
317 type Arguments = WorkerStartContext<
318 <Self as Worker>::Key,
319 <Self as Worker>::Message,
320 <Self as Worker>::Arguments,
321 >;
322 type State = WorkerState<Self>;
323
324 async fn pre_start(
325 &self,
326 _: ActorRef<Self::Msg>,
327 WorkerStartContext {
328 wid,
329 factory,
330 custom_start,
331 }: Self::Arguments,
332 ) -> Result<Self::State, ActorProcessingErr> {
333 let inner_state = <Self as Worker>::pre_start(self, wid, &factory, custom_start).await?;
334 Ok(Self::State {
335 wid,
336 factory,
337 state: inner_state,
338 })
339 }
340
341 async fn post_start(
342 &self,
343 _: ActorRef<Self::Msg>,
344 state: &mut Self::State,
345 ) -> Result<(), ActorProcessingErr> {
346 <Self as Worker>::post_start(self, state.wid, &state.factory, &mut state.state).await
347 }
348
349 async fn post_stop(
350 &self,
351 _: ActorRef<Self::Msg>,
352 state: &mut Self::State,
353 ) -> Result<(), ActorProcessingErr> {
354 <Self as Worker>::post_stop(self, state.wid, &state.factory, &mut state.state).await
355 }
356
357 async fn handle(
358 &self,
359 _: ActorRef<Self::Msg>,
360 message: Self::Msg,
361 state: &mut Self::State,
362 ) -> Result<(), ActorProcessingErr> {
363 match message {
364 WorkerMessage::FactoryPing(time) => {
365 tracing::trace!("Worker {} - ping", state.wid);
366
367 state
368 .factory
369 .cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
370 Ok(())
371 }
372 WorkerMessage::Dispatch(mut job) => {
373 let key = if let Some(span) = job.options.take_span() {
374 <Self as Worker>::handle(self, state.wid, &state.factory, job, &mut state.state)
375 .instrument(span)
376 .await
377 } else {
378 <Self as Worker>::handle(self, state.wid, &state.factory, job, &mut state.state)
379 .await
380 }?;
381 state
382 .factory
383 .cast(FactoryMessage::Finished(state.wid, key))?;
384 Ok(())
385 }
386 }
387 }
388
389 async fn handle_supervisor_evt(
390 &self,
391 myself: ActorRef<Self::Msg>,
392 message: SupervisionEvent,
393 state: &mut Self::State,
394 ) -> Result<(), ActorProcessingErr> {
395 <Self as Worker>::handle_supervisor_evt(self, myself.into(), message, &mut state.state)
396 .await
397 }
398}
399
400pub trait WorkerBuilder<TWorker, TWorkerStart>: Send + Sync
405where
406 TWorker: Actor,
407 TWorkerStart: Message,
408{
409 fn build(&mut self, wid: WorkerId) -> (TWorker, TWorkerStart);
416}
417
418#[cfg_attr(feature = "async-trait", crate::async_trait)]
421pub trait WorkerCapacityController: 'static + Send + Sync {
422 #[cfg(feature = "async-trait")]
429 async fn get_pool_size(&mut self, current: usize) -> usize;
430
431 #[cfg(not(feature = "async-trait"))]
438 fn get_pool_size(&mut self, current: usize) -> futures::future::BoxFuture<'_, usize>;
439}
440
441#[derive(Debug)]
443pub enum WorkerMessage<TKey, TMsg>
444where
445 TKey: JobKey,
446 TMsg: Message,
447{
448 FactoryPing(Instant),
452 Dispatch(Job<TKey, TMsg>),
457}
458
459#[cfg(feature = "cluster")]
460impl<TKey, TMsg> crate::Message for WorkerMessage<TKey, TMsg>
461where
462 TKey: JobKey,
463 TMsg: Message,
464{
465}
466
467#[derive(Debug)]
469pub struct WorkerStartContext<TKey, TMsg, TCustomStart>
470where
471 TKey: JobKey,
472 TMsg: Message,
473 TCustomStart: Message,
474{
475 pub wid: WorkerId,
477
478 pub factory: ActorRef<FactoryMessage<TKey, TMsg>>,
480
481 pub custom_start: TCustomStart,
483}
484
485pub struct WorkerProperties<TKey, TMsg>
487where
488 TKey: JobKey,
489 TMsg: Message,
490{
491 pub(crate) wid: WorkerId,
493
494 pub(crate) actor: ActorRef<WorkerMessage<TKey, TMsg>>,
496
497 factory_name: String,
499
500 handle: Option<JoinHandle<()>>,
502
503 message_queue: VecDeque<Job<TKey, TMsg>>,
505
506 pub(crate) discard_settings: WorkerDiscardSettings,
511
512 pub(crate) discard_handler: Option<Arc<dyn DiscardHandler<TKey, TMsg>>>,
514
515 is_ping_pending: bool,
517
518 last_ping: Instant,
520
521 stats: Option<Arc<dyn FactoryStatsLayer>>,
523
524 curr_jobs: HashMap<TKey, JobOptions>,
526
527 pub(crate) is_draining: bool,
529}
530
531impl<TKey, TMsg> Debug for WorkerProperties<TKey, TMsg>
532where
533 TKey: JobKey,
534 TMsg: Message,
535{
536 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
537 f.debug_struct("WorkerProperties")
538 .field("wid", &self.wid)
539 .field("actor", &self.actor)
540 .field("factory_name", &self.factory_name)
541 .field("discard_settings", &self.discard_settings)
542 .field("is_draining", &self.is_draining)
543 .finish()
544 }
545}
546
547impl<TKey, TMsg> WorkerProperties<TKey, TMsg>
548where
549 TKey: JobKey,
550 TMsg: Message,
551{
552 fn get_next_non_expired_job(&mut self) -> Option<Job<TKey, TMsg>> {
553 while let Some(mut job) = self.message_queue.pop_front() {
554 if !job.is_expired() {
555 return Some(job);
556 } else {
557 if let Some(handler) = &self.discard_handler {
558 handler.discard(DiscardReason::TtlExpired, &mut job);
559 }
560 self.stats.job_ttl_expired(&self.factory_name, 1);
561 }
562 }
563 None
564 }
565
566 pub(crate) fn new(
567 factory_name: String,
568 wid: WorkerId,
569 actor: ActorRef<WorkerMessage<TKey, TMsg>>,
570 discard_settings: WorkerDiscardSettings,
571 discard_handler: Option<Arc<dyn DiscardHandler<TKey, TMsg>>>,
572 handle: JoinHandle<()>,
573 stats: Option<Arc<dyn FactoryStatsLayer>>,
574 ) -> Self {
575 Self {
576 factory_name,
577 actor,
578 discard_settings,
579 discard_handler,
580 message_queue: VecDeque::new(),
581 curr_jobs: HashMap::new(),
582 wid,
583 is_ping_pending: false,
584 stats,
585 handle: Some(handle),
586 is_draining: false,
587 last_ping: Instant::now(),
588 }
589 }
590
591 pub(crate) fn get_join_handle(&mut self) -> Option<JoinHandle<()>> {
592 self.handle.take()
593 }
594
595 pub(crate) fn is_pid(&self, pid: ActorId) -> bool {
596 self.actor.get_id() == pid
597 }
598
599 pub fn is_processing_key(&self, key: &TKey) -> bool {
603 self.curr_jobs.contains_key(key)
604 }
605
606 pub(crate) fn replace_worker(
607 &mut self,
608 nworker: ActorRef<WorkerMessage<TKey, TMsg>>,
609 handle: JoinHandle<()>,
610 ) -> Result<(), ActorProcessingErr> {
611 self.is_ping_pending = false;
613 self.last_ping = Instant::now();
614 self.curr_jobs.clear();
615
616 self.actor = nworker;
617 self.handle = Some(handle);
618 if let Some(mut job) = self.get_next_non_expired_job() {
619 self.curr_jobs.insert(job.key.clone(), job.options.clone());
620 job.set_worker_time();
621 self.actor.cast(WorkerMessage::Dispatch(job))?;
622 }
623 Ok(())
624 }
625
626 pub fn is_available(&self) -> bool {
628 self.curr_jobs.is_empty() && self.message_queue.is_empty()
629 }
630
631 pub fn is_working(&self) -> bool {
633 !self.is_available()
634 }
635
636 pub(crate) fn is_stuck(&self, duration: Duration) -> bool {
638 if Instant::now() - self.last_ping > duration {
639 let key_strings = self
640 .curr_jobs
641 .keys()
642 .cloned()
643 .fold(String::new(), |a, key| format!("{a}\nJob key: {key:?}"));
644 tracing::warn!("Stuck worker: {}. Last jobs:\n{key_strings}", self.wid);
645 true
646 } else {
647 false
648 }
649 }
650
651 pub fn enqueue_job(
655 &mut self,
656 mut job: Job<TKey, TMsg>,
657 ) -> Result<(), Box<MessagingErr<WorkerMessage<TKey, TMsg>>>> {
658 self.stats.new_job(&self.factory_name);
660
661 if let Some((limit, DiscardMode::Newest)) = self.discard_settings.get_limit_and_mode() {
662 if limit > 0 && self.message_queue.len() >= limit {
663 self.stats.job_discarded(&self.factory_name);
665 if let Some(handler) = &self.discard_handler {
666 handler.discard(DiscardReason::Loadshed, &mut job);
667 }
668 job.reject();
669 return Ok(());
670 }
671 }
672
673 job.accept();
675 if self.curr_jobs.is_empty() {
676 self.curr_jobs.insert(job.key.clone(), job.options.clone());
677 if let Some(mut older_job) = self.get_next_non_expired_job() {
678 self.message_queue.push_back(job);
679 older_job.set_worker_time();
680 self.actor.cast(WorkerMessage::Dispatch(older_job))?;
681 } else {
682 job.set_worker_time();
683 self.actor.cast(WorkerMessage::Dispatch(job))?;
684 }
685 return Ok(());
686 }
687 self.message_queue.push_back(job);
688
689 if let Some((limit, DiscardMode::Oldest)) = self.discard_settings.get_limit_and_mode() {
690 while limit > 0 && self.message_queue.len() > limit {
692 if let Some(mut discarded) = self.get_next_non_expired_job() {
693 self.stats.job_discarded(&self.factory_name);
694 if let Some(handler) = &self.discard_handler {
695 handler.discard(DiscardReason::Loadshed, &mut discarded);
696 }
697 }
698 }
699 }
700 Ok(())
701 }
702
703 pub(crate) fn send_factory_ping(
705 &mut self,
706 ) -> Result<(), Box<MessagingErr<WorkerMessage<TKey, TMsg>>>> {
707 if !self.is_ping_pending {
708 self.is_ping_pending = true;
709 Ok(self
710 .actor
711 .cast(WorkerMessage::FactoryPing(Instant::now()))?)
712 } else {
713 Ok(())
715 }
716 }
717
718 pub(crate) fn ping_received(&mut self, time: Duration, discard_limit: usize) {
720 self.discard_settings.update_worker_limit(discard_limit);
721 self.stats.worker_ping_received(&self.factory_name, time);
722 self.is_ping_pending = false;
723 }
724
725 pub(crate) fn worker_complete(
728 &mut self,
729 key: TKey,
730 ) -> Result<Option<JobOptions>, Box<MessagingErr<WorkerMessage<TKey, TMsg>>>> {
731 let options = self.curr_jobs.remove(&key);
733 if let Some(mut job) = self.get_next_non_expired_job() {
735 self.curr_jobs.insert(job.key.clone(), job.options.clone());
736 job.set_worker_time();
737 self.actor.cast(WorkerMessage::Dispatch(job))?;
738 }
739
740 Ok(options)
741 }
742
743 pub(crate) fn set_draining(&mut self, is_draining: bool) {
745 self.is_draining = is_draining;
746 }
747}