use std::collections::HashMap;
use std::collections::VecDeque;
use std::fmt::Debug;
#[cfg(not(feature = "async-trait"))]
use std::future::Future;
use std::sync::Arc;
use bon::Builder;
use tracing::Instrument;
use super::discard::DiscardMode;
use super::discard::WorkerDiscardSettings;
use super::stats::FactoryStatsLayer;
use super::DiscardHandler;
use super::DiscardReason;
use super::FactoryMessage;
use super::Job;
use super::JobKey;
use super::JobOptions;
use super::WorkerId;
use crate::concurrency::Duration;
use crate::concurrency::Instant;
use crate::concurrency::JoinHandle;
use crate::Actor;
use crate::ActorCell;
use crate::ActorId;
use crate::ActorProcessingErr;
use crate::ActorRef;
use crate::Message;
use crate::MessagingErr;
use crate::SupervisionEvent;
#[derive(Builder, Debug)]
pub struct DeadMansSwitchConfiguration {
pub detection_timeout: Duration,
#[builder(default = true)]
pub kill_worker: bool,
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
pub trait Worker: Send + Sync + 'static {
type Key: JobKey;
type Message: Message;
type Arguments: Message;
type State: crate::State;
#[cfg(not(feature = "async-trait"))]
fn pre_start(
&self,
wid: WorkerId,
factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
args: Self::Arguments,
) -> impl Future<Output = Result<Self::State, ActorProcessingErr>> + Send;
#[cfg(feature = "async-trait")]
async fn pre_start(
&self,
wid: WorkerId,
factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr>;
#[allow(unused_variables)]
#[cfg(not(feature = "async-trait"))]
fn post_start(
&self,
wid: WorkerId,
factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
#[cfg(feature = "async-trait")]
async fn post_start(
&self,
wid: WorkerId,
factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
Ok(())
}
#[allow(unused_variables)]
#[cfg(not(feature = "async-trait"))]
fn post_stop(
&self,
wid: WorkerId,
factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async { Ok(()) }
}
#[allow(unused_variables)]
#[cfg(feature = "async-trait")]
async fn post_stop(
&self,
wid: WorkerId,
factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
Ok(())
}
#[allow(unused_variables)]
#[cfg(not(feature = "async-trait"))]
fn handle(
&self,
wid: WorkerId,
factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
job: Job<Self::Key, Self::Message>,
state: &mut Self::State,
) -> impl Future<Output = Result<Self::Key, ActorProcessingErr>> + Send {
async { Ok(job.key) }
}
#[allow(unused_variables)]
#[cfg(feature = "async-trait")]
async fn handle(
&self,
wid: WorkerId,
factory: &ActorRef<FactoryMessage<Self::Key, Self::Message>>,
job: Job<Self::Key, Self::Message>,
state: &mut Self::State,
) -> Result<Self::Key, ActorProcessingErr> {
Ok(job.key)
}
#[allow(unused_variables)]
#[cfg(not(feature = "async-trait"))]
fn handle_supervisor_evt(
&self,
myself: ActorCell,
message: SupervisionEvent,
state: &mut Self::State,
) -> impl Future<Output = Result<(), ActorProcessingErr>> + Send {
async move {
match message {
SupervisionEvent::ActorTerminated(who, _, _)
| SupervisionEvent::ActorFailed(who, _) => {
myself.stop(None);
}
_ => {}
}
Ok(())
}
}
#[allow(unused_variables)]
#[cfg(feature = "async-trait")]
async fn handle_supervisor_evt(
&self,
myself: ActorCell,
message: SupervisionEvent,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
SupervisionEvent::ActorTerminated(who, _, _)
| SupervisionEvent::ActorFailed(who, _) => {
myself.stop(None);
}
_ => {}
}
Ok(())
}
}
#[doc(hidden)]
pub struct WorkerState<TWorker: Worker> {
factory: ActorRef<FactoryMessage<TWorker::Key, TWorker::Message>>,
wid: WorkerId,
state: TWorker::State,
}
impl<TWorker: Worker> std::fmt::Debug for WorkerState<TWorker> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "WorkerState")
}
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
impl<T> Actor for T
where
T: Worker,
{
type Msg = WorkerMessage<<Self as Worker>::Key, <Self as Worker>::Message>;
type Arguments = WorkerStartContext<
<Self as Worker>::Key,
<Self as Worker>::Message,
<Self as Worker>::Arguments,
>;
type State = WorkerState<Self>;
async fn pre_start(
&self,
_: ActorRef<Self::Msg>,
WorkerStartContext {
wid,
factory,
custom_start,
}: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
let inner_state = <Self as Worker>::pre_start(self, wid, &factory, custom_start).await?;
Ok(Self::State {
wid,
factory,
state: inner_state,
})
}
async fn post_start(
&self,
_: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as Worker>::post_start(self, state.wid, &state.factory, &mut state.state).await
}
async fn post_stop(
&self,
_: ActorRef<Self::Msg>,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as Worker>::post_stop(self, state.wid, &state.factory, &mut state.state).await
}
async fn handle(
&self,
_: ActorRef<Self::Msg>,
message: Self::Msg,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
WorkerMessage::FactoryPing(time) => {
tracing::trace!("Worker {} - ping", state.wid);
state
.factory
.cast(FactoryMessage::WorkerPong(state.wid, time.elapsed()))?;
Ok(())
}
WorkerMessage::Dispatch(mut job) => {
let key = if let Some(span) = job.options.take_span() {
<Self as Worker>::handle(self, state.wid, &state.factory, job, &mut state.state)
.instrument(span)
.await
} else {
<Self as Worker>::handle(self, state.wid, &state.factory, job, &mut state.state)
.await
}?;
state
.factory
.cast(FactoryMessage::Finished(state.wid, key))?;
Ok(())
}
}
}
async fn handle_supervisor_evt(
&self,
myself: ActorRef<Self::Msg>,
message: SupervisionEvent,
state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
<Self as Worker>::handle_supervisor_evt(self, myself.into(), message, &mut state.state)
.await
}
}
pub trait WorkerBuilder<TWorker, TWorkerStart>: Send + Sync
where
TWorker: Actor,
TWorkerStart: Message,
{
fn build(&mut self, wid: WorkerId) -> (TWorker, TWorkerStart);
}
#[cfg_attr(feature = "async-trait", crate::async_trait)]
pub trait WorkerCapacityController: 'static + Send + Sync {
#[cfg(feature = "async-trait")]
async fn get_pool_size(&mut self, current: usize) -> usize;
#[cfg(not(feature = "async-trait"))]
fn get_pool_size(&mut self, current: usize) -> futures::future::BoxFuture<'_, usize>;
}
#[derive(Debug)]
pub enum WorkerMessage<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
FactoryPing(Instant),
Dispatch(Job<TKey, TMsg>),
}
#[cfg(feature = "cluster")]
impl<TKey, TMsg> crate::Message for WorkerMessage<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
}
#[derive(Debug)]
pub struct WorkerStartContext<TKey, TMsg, TCustomStart>
where
TKey: JobKey,
TMsg: Message,
TCustomStart: Message,
{
pub wid: WorkerId,
pub factory: ActorRef<FactoryMessage<TKey, TMsg>>,
pub custom_start: TCustomStart,
}
pub struct WorkerProperties<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
pub(crate) wid: WorkerId,
pub(crate) actor: ActorRef<WorkerMessage<TKey, TMsg>>,
factory_name: String,
handle: Option<JoinHandle<()>>,
message_queue: VecDeque<Job<TKey, TMsg>>,
pub(crate) discard_settings: WorkerDiscardSettings,
pub(crate) discard_handler: Option<Arc<dyn DiscardHandler<TKey, TMsg>>>,
is_ping_pending: bool,
last_ping: Instant,
stats: Option<Arc<dyn FactoryStatsLayer>>,
curr_jobs: HashMap<TKey, JobOptions>,
pub(crate) is_draining: bool,
}
impl<TKey, TMsg> Debug for WorkerProperties<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WorkerProperties")
.field("wid", &self.wid)
.field("actor", &self.actor)
.field("factory_name", &self.factory_name)
.field("discard_settings", &self.discard_settings)
.field("is_draining", &self.is_draining)
.finish()
}
}
impl<TKey, TMsg> WorkerProperties<TKey, TMsg>
where
TKey: JobKey,
TMsg: Message,
{
fn get_next_non_expired_job(&mut self) -> Option<Job<TKey, TMsg>> {
while let Some(mut job) = self.message_queue.pop_front() {
if !job.is_expired() {
return Some(job);
} else {
if let Some(handler) = &self.discard_handler {
handler.discard(DiscardReason::TtlExpired, &mut job);
}
self.stats.job_ttl_expired(&self.factory_name, 1);
}
}
None
}
pub(crate) fn new(
factory_name: String,
wid: WorkerId,
actor: ActorRef<WorkerMessage<TKey, TMsg>>,
discard_settings: WorkerDiscardSettings,
discard_handler: Option<Arc<dyn DiscardHandler<TKey, TMsg>>>,
handle: JoinHandle<()>,
stats: Option<Arc<dyn FactoryStatsLayer>>,
) -> Self {
Self {
factory_name,
actor,
discard_settings,
discard_handler,
message_queue: VecDeque::new(),
curr_jobs: HashMap::new(),
wid,
is_ping_pending: false,
stats,
handle: Some(handle),
is_draining: false,
last_ping: Instant::now(),
}
}
pub(crate) fn get_join_handle(&mut self) -> Option<JoinHandle<()>> {
self.handle.take()
}
pub(crate) fn is_pid(&self, pid: ActorId) -> bool {
self.actor.get_id() == pid
}
pub fn is_processing_key(&self, key: &TKey) -> bool {
self.curr_jobs.contains_key(key)
}
pub(crate) fn replace_worker(
&mut self,
nworker: ActorRef<WorkerMessage<TKey, TMsg>>,
handle: JoinHandle<()>,
) -> Result<(), ActorProcessingErr> {
self.is_ping_pending = false;
self.last_ping = Instant::now();
self.curr_jobs.clear();
self.actor = nworker;
self.handle = Some(handle);
if let Some(mut job) = self.get_next_non_expired_job() {
self.curr_jobs.insert(job.key.clone(), job.options.clone());
job.set_worker_time();
self.actor.cast(WorkerMessage::Dispatch(job))?;
}
Ok(())
}
pub fn is_available(&self) -> bool {
self.curr_jobs.is_empty() && self.message_queue.is_empty()
}
pub fn is_working(&self) -> bool {
!self.is_available()
}
pub(crate) fn is_stuck(&self, duration: Duration) -> bool {
if Instant::now() - self.last_ping > duration {
let key_strings = self
.curr_jobs
.keys()
.cloned()
.fold(String::new(), |a, key| format!("{a}\nJob key: {key:?}"));
tracing::warn!("Stuck worker: {}. Last jobs:\n{key_strings}", self.wid);
true
} else {
false
}
}
pub fn enqueue_job(
&mut self,
mut job: Job<TKey, TMsg>,
) -> Result<(), Box<MessagingErr<WorkerMessage<TKey, TMsg>>>> {
self.stats.new_job(&self.factory_name);
if let Some((limit, DiscardMode::Newest)) = self.discard_settings.get_limit_and_mode() {
if limit > 0 && self.message_queue.len() >= limit {
self.stats.job_discarded(&self.factory_name);
if let Some(handler) = &self.discard_handler {
handler.discard(DiscardReason::Loadshed, &mut job);
}
job.reject();
return Ok(());
}
}
job.accept();
if self.curr_jobs.is_empty() {
self.curr_jobs.insert(job.key.clone(), job.options.clone());
if let Some(mut older_job) = self.get_next_non_expired_job() {
self.message_queue.push_back(job);
older_job.set_worker_time();
self.actor.cast(WorkerMessage::Dispatch(older_job))?;
} else {
job.set_worker_time();
self.actor.cast(WorkerMessage::Dispatch(job))?;
}
return Ok(());
}
self.message_queue.push_back(job);
if let Some((limit, DiscardMode::Oldest)) = self.discard_settings.get_limit_and_mode() {
while limit > 0 && self.message_queue.len() > limit {
if let Some(mut discarded) = self.get_next_non_expired_job() {
self.stats.job_discarded(&self.factory_name);
if let Some(handler) = &self.discard_handler {
handler.discard(DiscardReason::Loadshed, &mut discarded);
}
}
}
}
Ok(())
}
pub(crate) fn send_factory_ping(
&mut self,
) -> Result<(), Box<MessagingErr<WorkerMessage<TKey, TMsg>>>> {
if !self.is_ping_pending {
self.is_ping_pending = true;
Ok(self
.actor
.cast(WorkerMessage::FactoryPing(Instant::now()))?)
} else {
Ok(())
}
}
pub(crate) fn ping_received(&mut self, time: Duration, discard_limit: usize) {
self.discard_settings.update_worker_limit(discard_limit);
self.stats.worker_ping_received(&self.factory_name, time);
self.is_ping_pending = false;
}
pub(crate) fn worker_complete(
&mut self,
key: TKey,
) -> Result<Option<JobOptions>, Box<MessagingErr<WorkerMessage<TKey, TMsg>>>> {
let options = self.curr_jobs.remove(&key);
if let Some(mut job) = self.get_next_non_expired_job() {
self.curr_jobs.insert(job.key.clone(), job.options.clone());
job.set_worker_time();
self.actor.cast(WorkerMessage::Dispatch(job))?;
}
Ok(options)
}
pub(crate) fn set_draining(&mut self, is_draining: bool) {
self.is_draining = is_draining;
}
}