use crate::command::{ShutdownMode, WorkerOutcome};
use crate::job::{BoxedExecFn, InstanceId, JobDefinition, TKJobId, TKJobRequest, WorkerId};
use crate::metrics::SchedulerMetrics;
#[cfg(feature = "job_context")]
use crate::job::context::{JobContext, CURRENT_JOB_CONTEXT};
use std::collections::HashMap;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use chrono::{DateTime, Utc};
use fibre::{mpmc, mpsc};
use futures::FutureExt;
use parking_lot::RwLock;
use tokio::sync::watch;
use tracing::{debug, error, info, trace, warn, Instrument};
type JobDispatchTuple = (InstanceId, TKJobId, DateTime<Utc>);
pub(crate) struct Worker {
id: WorkerId, job_definitions: Arc<RwLock<HashMap<TKJobId, JobDefinition>>>,
metrics: SchedulerMetrics,
shutdown_rx: watch::Receiver<Option<ShutdownMode>>,
worker_outcome_tx: mpsc::BoundedAsyncSender<WorkerOutcome>,
job_dispatch_rx: mpmc::AsyncReceiver<JobDispatchTuple>,
active_workers_counter: Arc<AtomicUsize>,
}
impl Worker {
#[allow(clippy::too_many_arguments)] pub fn new(
id: usize,
job_definitions: Arc<RwLock<HashMap<TKJobId, JobDefinition>>>,
metrics: SchedulerMetrics,
shutdown_rx: watch::Receiver<Option<ShutdownMode>>,
worker_outcome_tx: mpsc::BoundedAsyncSender<WorkerOutcome>,
job_dispatch_rx: mpmc::AsyncReceiver<JobDispatchTuple>,
active_workers_counter: Arc<AtomicUsize>,
) -> Self {
Self {
id,
job_definitions,
metrics,
shutdown_rx,
worker_outcome_tx,
job_dispatch_rx,
active_workers_counter,
}
}
pub async fn run(&mut self) {
info!(worker_id = self.id, "Worker started. Waiting for jobs...");
loop {
if self.is_shutting_down() {
break;
}
tokio::select! {
biased;
Ok(()) = self.shutdown_rx.changed() => {
if self.is_shutting_down() {
info!(worker_id=self.id, mode=?self.shutdown_rx.borrow().unwrap(), "Worker received shutdown signal.");
break; }
}
result = self.job_dispatch_rx.recv() => {
match result {
Ok((instance_id, lineage_id, scheduled_time)) => {
debug!(worker_id=self.id, %instance_id, %lineage_id, %scheduled_time, "Received job dispatch.");
let start_time = Instant::now(); let now_utc = Utc::now();
if now_utc >= scheduled_time {
let wait_duration = now_utc.signed_duration_since(scheduled_time);
match wait_duration.to_std() {
Ok(std_wait_duration) => {
self.metrics.job_queue_wait_duration.record(std_wait_duration);
trace!(worker_id=self.id, %instance_id, wait_ms = std_wait_duration.as_millis(), "Recorded queue wait time.");
}
Err(e) => {
warn!(worker_id=self.id, %instance_id, %scheduled_time, error=%e, "Failed to convert wait duration");
}
}
} else {
warn!(worker_id=self.id, %instance_id, %scheduled_time, "Job started *before* its scheduled time?");
self.metrics.job_queue_wait_duration.record(Duration::ZERO); }
let maybe_job_info = self.fetch_job_details(instance_id, lineage_id).await;
if let Some((exec_fn, request)) = maybe_job_info {
let job_span = tracing::span!(
tracing::Level::INFO, "job_exec",
worker_id = self.id,
%lineage_id,
%instance_id,
job_name = request.name.as_str()
);
self
.execute_and_handle(instance_id, lineage_id, request, exec_fn, start_time, scheduled_time)
.instrument(job_span)
.await;
} else {
error!(worker_id=self.id, %instance_id, %lineage_id, "Discarding dispatch due to fetch failure.");
}
},
Err(e) => {
if !self.is_shutting_down() {
error!(worker_id=self.id, "Job dispatch channel closed unexpectedly. Worker exiting. Error: {:?}", e);
} else {
info!(worker_id=self.id, "Job dispatch channel closed during shutdown. Worker exiting.");
}
break; }
} }
} }
info!(worker_id = self.id, "Worker task shutting down.");
}
fn is_shutting_down(&self) -> bool {
self.shutdown_rx.borrow().is_some()
}
async fn fetch_job_details(
&self,
instance_id: InstanceId,
lineage_id: TKJobId,
) -> Option<(Arc<BoxedExecFn>, TKJobRequest)> {
if let Some(def) = self.job_definitions.read().get(&lineage_id) {
return Some((def.exec_fn.clone(), def.request.clone()));
}
warn!(
worker_id = self.id,
%lineage_id,
%instance_id,
"Job definition not found for dispatched job!"
);
let outcome = WorkerOutcome::FetchFailed {
instance_id,
lineage_id,
};
if self.worker_outcome_tx.send(outcome).await.is_err() {
error!(
worker_id = self.id,
%lineage_id,
"Failed to send FetchFailed outcome for missing definition."
);
}
return None;
}
async fn execute_and_handle(
&self,
instance_id: InstanceId,
lineage_id: TKJobId,
request: TKJobRequest,
exec_fn: Arc<BoxedExecFn>,
job_start_instant: Instant,
scheduled_time: DateTime<Utc>,
) {
let execution_future = async move {
info!("Starting job execution.");
let exec_result = self.execute_job_logic(&exec_fn, lineage_id, instance_id).await;
let duration = job_start_instant.elapsed();
self.metrics.job_execution_duration.record(duration);
let success_str = match exec_result {
Ok(true) => "Success",
Ok(false) => "Fail",
Err(()) => "Panic",
};
info!(
duration_ms = duration.as_millis(),
outcome = success_str,
"Finished job execution."
);
self
.handle_job_result_outcome(instance_id, lineage_id, request, exec_result, scheduled_time)
.await;
};
let result = AssertUnwindSafe(execution_future).catch_unwind().await;
match result {
Ok(_) => {
let prev_count = self.active_workers_counter.fetch_sub(1, AtomicOrdering::Relaxed);
debug!(
worker_id = self.id,
prev_active = prev_count.saturating_sub(1),
"Decremented active worker count."
);
self
.metrics
.workers_active_current
.store(prev_count.saturating_sub(1), AtomicOrdering::Relaxed);
}
Err(panic_payload) => {
let panic_info = panic_payload
.downcast_ref::<&'static str>()
.map(|s| *s)
.or_else(|| panic_payload.downcast_ref::<String>().map(|s| s.as_str()))
.unwrap_or("Unknown panic payload")
.to_string();
error!(%lineage_id, %instance_id, %panic_info, "Worker caught a panic during job execution; recovering.");
self.metrics.jobs_panicked.fetch_add(1, AtomicOrdering::Relaxed);
let outcome = WorkerOutcome::Panic {
lineage_id,
completed_instance_id: instance_id,
panic_info,
};
if self.worker_outcome_tx.send(outcome).await.is_err() {
error!(%lineage_id, "Failed to send Panic outcome to coordinator (scheduler likely shutdown).");
}
let prev_count = self.active_workers_counter.fetch_sub(1, AtomicOrdering::Relaxed);
debug!(
worker_id = self.id,
prev_active = prev_count.saturating_sub(1),
"Decremented active worker count after catching panic."
);
self
.metrics
.workers_active_current
.store(prev_count.saturating_sub(1), AtomicOrdering::Relaxed);
}
}
}
async fn execute_job_logic(
&self,
exec_fn: &Arc<BoxedExecFn>,
lineage_id: TKJobId,
instance_id: InstanceId,
) -> Result<bool, ()> {
let func = exec_fn.clone();
let future_to_run = func();
#[cfg(feature = "job_context")]
let job_result_bool = {
let context = JobContext {
tk_job_id: lineage_id,
instance_id,
};
CURRENT_JOB_CONTEXT.scope(context, future_to_run).await
};
#[cfg(not(feature = "job_context"))]
let job_result_bool = future_to_run.await;
if job_result_bool {
self.metrics.jobs_executed_success.fetch_add(1, AtomicOrdering::Relaxed);
Ok(true)
} else {
self.metrics.jobs_executed_fail.fetch_add(1, AtomicOrdering::Relaxed);
Ok(false)
}
}
async fn handle_job_result_outcome(
&self,
instance_id: InstanceId, lineage_id: TKJobId,
original_request: TKJobRequest, result: Result<bool, ()>, scheduled_time: DateTime<Utc>,
) {
let should_retry = !matches!(result, Ok(true)); let current_retry_count = original_request.retry_count;
let outcome: WorkerOutcome;
if should_retry {
if current_retry_count < original_request.max_retries {
let next_retry_count = current_retry_count + 1;
let mut request_for_calc = original_request.clone(); request_for_calc.retry_count = next_retry_count; let next_run_time = request_for_calc.calculate_retry_time();
self.metrics.jobs_retried.fetch_add(1, AtomicOrdering::Relaxed);
info!(
worker_id = self.id, %lineage_id, %instance_id,
retry_attempt = next_retry_count,
max_retries = original_request.max_retries,
next_run = %next_run_time,
"Job failed/panicked, scheduling retry."
);
outcome = WorkerOutcome::Reschedule {
lineage_id,
completed_instance_id: instance_id,
completed_instance_scheduled_time: scheduled_time,
next_run_time,
updated_retry_count: next_retry_count, };
} else {
self
.metrics
.jobs_permanently_failed
.fetch_add(1, AtomicOrdering::Relaxed);
error!(
worker_id = self.id, %lineage_id, %instance_id,
retries = current_retry_count,
"Job failed permanently after exhausting retries."
);
let next_regular_run = original_request.calculate_next_run();
if let Some(next_run_time) = next_regular_run {
info!(
worker_id = self.id, %lineage_id, %instance_id,
next_run = %next_run_time,
"Scheduling next regular run after permanent failure."
);
outcome = WorkerOutcome::Reschedule {
lineage_id,
completed_instance_id: instance_id,
completed_instance_scheduled_time: scheduled_time,
next_run_time,
updated_retry_count: 0, };
} else {
info!(
worker_id = self.id, %lineage_id, %instance_id,
"Job failed permanently and has no further scheduled runs."
);
outcome = WorkerOutcome::Complete {
lineage_id,
completed_instance_id: instance_id,
is_permanent_failure: true,
};
}
}
} else {
let next_regular_run = original_request.schedule.calculate_next_run(scheduled_time);
if let Some(next_run_time) = next_regular_run {
info!(
worker_id = self.id, %lineage_id, %instance_id,
next_run = %next_run_time,
"Job succeeded, scheduling next run."
);
outcome = WorkerOutcome::Reschedule {
lineage_id,
completed_instance_id: instance_id,
completed_instance_scheduled_time: scheduled_time,
next_run_time,
updated_retry_count: 0, };
} else {
info!(
worker_id = self.id, %lineage_id, %instance_id,
"Job succeeded and has no further scheduled runs."
);
outcome = WorkerOutcome::Complete {
lineage_id,
completed_instance_id: instance_id,
is_permanent_failure: false,
};
}
}
debug!(
worker_id = self.id,
%lineage_id,
"Sending job outcome to coordinator."
);
if self.worker_outcome_tx.send(outcome).await.is_err() {
warn!(
worker_id = self.id,
%lineage_id,
"Failed to send job outcome to coordinator (scheduler likely shutdown)."
);
}
}
}