use chrono::{DateTime, Utc};
use opentelemetry::KeyValue;
use sqlx::PgPool;
use std::time::Duration;
use tokio::time::interval;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use tasker_shared::config::tasker::{BatchProcessingConfig, StalenessDetectionConfig};
use tasker_shared::database::sql_functions::SqlFunctionExecutor;
use tasker_shared::errors::TaskerResult;
use tasker_shared::metrics::orchestration;
use tasker_shared::models::orchestration::StalenessAction;
#[derive(Debug, Clone, sqlx::FromRow)]
pub struct StalenessResult {
pub task_uuid: Uuid,
pub namespace_name: String,
pub task_name: String,
pub current_state: String,
pub time_in_state_minutes: i32,
pub staleness_threshold_minutes: i32,
pub action_taken: StalenessAction,
pub moved_to_dlq: bool,
pub transition_success: bool,
}
#[derive(Clone)]
pub struct StalenessDetector {
executor: SqlFunctionExecutor,
config: StalenessDetectionConfig,
batch_config: BatchProcessingConfig,
}
impl std::fmt::Debug for StalenessDetector {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StalenessDetector")
.field("config", &self.config)
.field("batch_config", &self.batch_config)
.finish_non_exhaustive()
}
}
impl StalenessDetector {
#[must_use]
pub fn new(
pool: PgPool,
config: StalenessDetectionConfig,
batch_config: BatchProcessingConfig,
) -> Self {
let executor = SqlFunctionExecutor::new(pool);
Self {
executor,
config,
batch_config,
}
}
pub async fn run(&self) -> TaskerResult<()> {
let interval_duration = Duration::from_secs(self.config.detection_interval_seconds as u64);
let mut interval_timer = interval(interval_duration);
info!(
enabled = self.config.enabled,
interval_seconds = self.config.detection_interval_seconds,
batch_size = self.config.batch_size,
"Starting staleness detector"
);
orchestration::staleness_detection_runs_total()
.add(1, &[KeyValue::new("dry_run", self.config.dry_run)]);
loop {
interval_timer.tick().await;
let start = std::time::Instant::now();
match self.detect_and_transition_stale_tasks().await {
Ok(results) => {
let total = results.len();
let moved_to_dlq = results.iter().filter(|r| r.moved_to_dlq).count();
let transitioned = results.iter().filter(|r| r.transition_success).count();
let duration_ms = start.elapsed().as_millis() as f64;
orchestration::staleness_detection_duration().record(
duration_ms,
&[
KeyValue::new("dry_run", self.config.dry_run),
KeyValue::new("tasks_detected", total as i64),
],
);
if total > 0 {
info!(
total = total,
moved_to_dlq = moved_to_dlq,
transitioned = transitioned,
dry_run = self.config.dry_run,
duration_ms = duration_ms,
"Staleness detection completed"
);
self.record_detection_metrics(&results);
} else {
debug!("No stale tasks detected this cycle");
}
}
Err(e) => {
error!(error = %e, "Staleness detection cycle failed");
continue;
}
}
}
}
async fn detect_and_transition_stale_tasks(&self) -> TaskerResult<Vec<StalenessResult>> {
let dry_run = self.config.dry_run;
let batch_size = self.config.batch_size;
let waiting_deps_threshold = self.config.thresholds.waiting_for_dependencies_minutes;
let waiting_retry_threshold = self.config.thresholds.waiting_for_retry_minutes;
let steps_in_process_threshold = self.config.thresholds.steps_in_process_minutes;
let max_lifetime_hours = self.config.thresholds.task_max_lifetime_hours;
debug!(
dry_run = dry_run,
batch_size = batch_size,
"Calling detect_and_transition_stale_tasks SQL function via SqlFunctionExecutor"
);
let db_results = self
.executor
.detect_and_transition_stale_tasks(
dry_run,
batch_size as i32,
waiting_deps_threshold as i32,
waiting_retry_threshold as i32,
steps_in_process_threshold as i32,
max_lifetime_hours as i32,
)
.await?;
let mut results: Vec<StalenessResult> = db_results
.into_iter()
.map(|r| StalenessResult {
task_uuid: r.task_uuid,
namespace_name: r.namespace_name,
task_name: r.task_name,
current_state: r.current_state,
time_in_state_minutes: r.time_in_state_minutes,
staleness_threshold_minutes: r.staleness_threshold_minutes,
action_taken: r.action_taken.parse().unwrap(), moved_to_dlq: r.moved_to_dlq,
transition_success: r.transition_success,
})
.collect();
if self.batch_config.enabled && self.batch_config.checkpoint_stall_minutes > 0 {
let initial_count = results.len();
let mut filtered_results = Vec::with_capacity(results.len());
for result in results {
match self.should_task_be_stale(result.task_uuid).await {
Ok(is_stale) if is_stale => {
filtered_results.push(result);
}
Ok(_) => {
debug!(
task_uuid = %result.task_uuid,
"Task filtered out - batch workers have healthy checkpoints"
);
}
Err(e) => {
warn!(
task_uuid = %result.task_uuid,
error = %e,
"Error checking batch worker health, including task in staleness results"
);
filtered_results.push(result);
}
}
}
results = filtered_results;
let filtered_count = initial_count - results.len();
if filtered_count > 0 {
info!(
initial_count = initial_count,
filtered_count = filtered_count,
final_count = results.len(),
"Batch worker checkpoint health filtering applied"
);
}
}
Ok(results)
}
fn record_detection_metrics(&self, results: &[StalenessResult]) {
for result in results {
let time_bucket = if result.time_in_state_minutes <= 120 {
"60-120"
} else if result.time_in_state_minutes <= 360 {
"120-360"
} else {
">360"
};
let detection_labels = &[
KeyValue::new("state", result.current_state.clone()),
KeyValue::new("time_in_state_minutes", time_bucket.to_string()),
];
orchestration::stale_tasks_detected_total().add(1, detection_labels);
if result.moved_to_dlq {
let dlq_labels = &[
KeyValue::new("dlq_reason", "staleness_timeout"),
KeyValue::new("original_state", result.current_state.clone()),
];
orchestration::dlq_entries_created_total().add(1, dlq_labels);
}
if result.transition_success {
let transition_labels = &[
KeyValue::new("original_state", result.current_state.clone()),
KeyValue::new("reason", "staleness_timeout"),
];
orchestration::tasks_transitioned_to_error_total().add(1, transition_labels);
}
debug!(
task_uuid = %result.task_uuid,
namespace = %result.namespace_name,
task_name = %result.task_name,
state = %result.current_state,
time_in_state_min = result.time_in_state_minutes,
threshold_min = result.staleness_threshold_minutes,
action = %result.action_taken,
moved_to_dlq = result.moved_to_dlq,
transition_success = result.transition_success,
"Stale task detected and processed"
);
}
if !results.is_empty() {
let failures = results
.iter()
.filter(|r| r.action_taken.is_failure())
.count();
if failures > 0 {
warn!(
failures = failures,
total = results.len(),
"Some stale task transitions failed"
);
}
}
self.update_pending_investigations_gauge();
}
fn update_pending_investigations_gauge(&self) {
let executor = self.executor.clone();
tokio::spawn(async move {
match executor.pool().acquire().await {
Ok(mut conn) => {
match sqlx::query_scalar::<_, i64>(
"SELECT COUNT(*) FROM tasker.tasks_dlq WHERE resolution_status = 'pending'",
)
.fetch_one(&mut *conn)
.await
{
Ok(pending_count) => {
orchestration::dlq_pending_investigations()
.record(pending_count as u64, &[]);
debug!(
pending_investigations = pending_count,
"Updated DLQ pending investigations gauge"
);
}
Err(e) => {
warn!(
error = %e,
"Failed to query pending DLQ count for gauge update"
);
}
}
}
Err(e) => {
warn!(
error = %e,
"Failed to acquire database connection for pending DLQ gauge update"
);
}
}
});
}
#[must_use]
pub const fn config(&self) -> &StalenessDetectionConfig {
&self.config
}
fn is_batch_worker(&self, results: &serde_json::Value) -> bool {
results
.as_object()
.and_then(|obj| obj.get("batch_cursor"))
.is_some()
}
fn is_batch_worker_checkpoint_healthy(&self, results: &serde_json::Value) -> bool {
let batch_cursor = match results.as_object().and_then(|obj| obj.get("batch_cursor")) {
Some(cursor) => cursor,
None => return true, };
let last_checkpoint_str = match batch_cursor
.as_object()
.and_then(|cursor| cursor.get("last_checkpoint"))
.and_then(|v| v.as_str())
{
Some(ts) => ts,
None => {
debug!("Batch worker missing last_checkpoint timestamp");
return false; }
};
let last_checkpoint = match DateTime::parse_from_rfc3339(last_checkpoint_str) {
Ok(dt) => dt.with_timezone(&Utc),
Err(e) => {
warn!(error = %e, "Failed to parse batch worker checkpoint timestamp");
return false; }
};
let elapsed = Utc::now() - last_checkpoint;
let stall_threshold =
chrono::Duration::minutes(self.batch_config.checkpoint_stall_minutes as i64);
if elapsed > stall_threshold {
warn!(
elapsed_minutes = elapsed.num_minutes(),
threshold_minutes = stall_threshold.num_minutes(),
"Batch worker stalled - no recent checkpoint"
);
return false; }
true }
async fn should_task_be_stale(&self, task_uuid: Uuid) -> TaskerResult<bool> {
let steps = sqlx::query!(
r#"
SELECT ws.workflow_step_uuid, ws.results
FROM tasker.workflow_steps ws
INNER JOIN tasker.workflow_step_transitions wst
ON ws.workflow_step_uuid = wst.workflow_step_uuid
WHERE ws.task_uuid = $1
AND wst.most_recent = true
AND wst.to_state = 'in_progress'
"#,
task_uuid
)
.fetch_all(self.executor.pool())
.await?;
if steps.is_empty() {
return Ok(true);
}
for step in steps {
let results = step.results.unwrap_or_default();
if self.is_batch_worker(&results) && !self.is_batch_worker_checkpoint_healthy(&results)
{
debug!(
task_uuid = %task_uuid,
step_uuid = %step.workflow_step_uuid,
"Batch worker failed checkpoint health check"
);
return Ok(true); }
}
Ok(true)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_staleness_detector_creation() {
let pool = PgPool::connect_lazy("postgresql://test").expect("Should create lazy pool");
let config = StalenessDetectionConfig::default();
let batch_config = BatchProcessingConfig::default();
let detector = StalenessDetector::new(pool, config.clone(), batch_config);
assert_eq!(detector.config().enabled, config.enabled);
assert_eq!(
detector.config().detection_interval_seconds,
config.detection_interval_seconds
);
}
#[test]
fn test_staleness_result_structure() {
let result = StalenessResult {
task_uuid: Uuid::new_v4(),
namespace_name: "test_namespace".to_string(),
task_name: "test_task".to_string(),
current_state: "waiting_for_dependencies".to_string(),
time_in_state_minutes: 120,
staleness_threshold_minutes: 60,
action_taken: StalenessAction::TransitionedToDlqAndError,
moved_to_dlq: true,
transition_success: true,
};
assert!(result.moved_to_dlq);
assert!(result.transition_success);
assert_eq!(result.time_in_state_minutes, 120);
assert_eq!(
result.action_taken,
StalenessAction::TransitionedToDlqAndError
);
assert!(!result.action_taken.is_failure());
assert!(result.action_taken.dlq_created());
assert!(result.action_taken.transition_succeeded());
}
#[tokio::test]
async fn test_staleness_detector_config_getter() {
let pool = PgPool::connect_lazy("postgresql://test").expect("Should create lazy pool");
let config = StalenessDetectionConfig::default();
let batch_config = BatchProcessingConfig::default();
let detector = StalenessDetector::new(pool, config.clone(), batch_config);
assert_eq!(detector.config().batch_size, config.batch_size);
}
#[tokio::test]
async fn test_staleness_detector_debug_impl() {
let pool = PgPool::connect_lazy("postgresql://test").expect("lazy pool");
let config = StalenessDetectionConfig::default();
let batch_config = BatchProcessingConfig::default();
let detector = StalenessDetector::new(pool, config, batch_config);
let debug_output = format!("{detector:?}");
assert!(debug_output.contains("StalenessDetector"));
assert!(debug_output.contains("config"));
assert!(debug_output.contains("batch_config"));
}
#[test]
fn test_staleness_result_clone() {
let result = StalenessResult {
task_uuid: Uuid::new_v4(),
namespace_name: "ns".to_string(),
task_name: "task".to_string(),
current_state: "steps_in_process".to_string(),
time_in_state_minutes: 45,
staleness_threshold_minutes: 30,
action_taken: StalenessAction::TransitionFailed,
moved_to_dlq: false,
transition_success: false,
};
let cloned = result.clone();
assert_eq!(cloned.task_uuid, result.task_uuid);
assert_eq!(cloned.namespace_name, result.namespace_name);
assert!(cloned.action_taken.is_failure());
}
#[test]
fn test_staleness_result_all_action_variants() {
let actions = [
(
StalenessAction::WouldTransitionToDlqAndError,
false,
false,
false,
),
(
StalenessAction::TransitionedToDlqAndError,
false,
true,
true,
),
(StalenessAction::MovedToDlqOnly, true, true, false),
(StalenessAction::TransitionedToErrorOnly, true, false, true),
(StalenessAction::TransitionFailed, true, false, false),
];
for (action, is_failure, dlq_created, transition_succeeded) in actions {
assert_eq!(
action.is_failure(),
is_failure,
"is_failure mismatch for {action:?}"
);
assert_eq!(
action.dlq_created(),
dlq_created,
"dlq_created mismatch for {action:?}"
);
assert_eq!(
action.transition_succeeded(),
transition_succeeded,
"transition_succeeded mismatch for {action:?}"
);
}
}
fn make_detector_with_stall_minutes(stall_minutes: u32) -> StalenessDetector {
let pool = PgPool::connect_lazy("postgresql://test").expect("lazy pool");
let config = StalenessDetectionConfig::default();
let batch_config = BatchProcessingConfig {
enabled: true,
checkpoint_stall_minutes: stall_minutes,
..Default::default()
};
StalenessDetector::new(pool, config, batch_config)
}
#[tokio::test]
async fn test_is_batch_worker_with_batch_cursor() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!({
"batch_cursor": {
"offset": 0,
"limit": 100,
"last_checkpoint": "2026-01-31T10:00:00Z"
}
});
assert!(detector.is_batch_worker(&results));
}
#[tokio::test]
async fn test_is_batch_worker_without_batch_cursor() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!({
"status": "complete",
"output": {"key": "value"}
});
assert!(!detector.is_batch_worker(&results));
}
#[tokio::test]
async fn test_is_batch_worker_empty_object() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!({});
assert!(!detector.is_batch_worker(&results));
}
#[tokio::test]
async fn test_is_batch_worker_null_value() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::Value::Null;
assert!(!detector.is_batch_worker(&results));
}
#[tokio::test]
async fn test_is_batch_worker_array_value() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!([1, 2, 3]);
assert!(!detector.is_batch_worker(&results));
}
#[tokio::test]
async fn test_is_batch_worker_null_batch_cursor() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!({ "batch_cursor": null });
assert!(detector.is_batch_worker(&results));
}
#[tokio::test]
async fn test_checkpoint_healthy_recent_timestamp() {
let detector = make_detector_with_stall_minutes(15);
let recent = Utc::now() - chrono::Duration::minutes(5);
let results = serde_json::json!({
"batch_cursor": {
"last_checkpoint": recent.to_rfc3339()
}
});
assert!(detector.is_batch_worker_checkpoint_healthy(&results));
}
#[tokio::test]
async fn test_checkpoint_unhealthy_stale_timestamp() {
let detector = make_detector_with_stall_minutes(15);
let stale = Utc::now() - chrono::Duration::minutes(30);
let results = serde_json::json!({
"batch_cursor": {
"last_checkpoint": stale.to_rfc3339()
}
});
assert!(!detector.is_batch_worker_checkpoint_healthy(&results));
}
#[tokio::test]
async fn test_checkpoint_healthy_exactly_at_threshold() {
let detector = make_detector_with_stall_minutes(15);
let at_threshold = Utc::now() - chrono::Duration::minutes(14);
let results = serde_json::json!({
"batch_cursor": {
"last_checkpoint": at_threshold.to_rfc3339()
}
});
assert!(detector.is_batch_worker_checkpoint_healthy(&results));
}
#[tokio::test]
async fn test_checkpoint_unhealthy_missing_last_checkpoint() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!({
"batch_cursor": {
"offset": 0,
"limit": 100
}
});
assert!(!detector.is_batch_worker_checkpoint_healthy(&results));
}
#[tokio::test]
async fn test_checkpoint_unhealthy_invalid_timestamp() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!({
"batch_cursor": {
"last_checkpoint": "not-a-valid-timestamp"
}
});
assert!(!detector.is_batch_worker_checkpoint_healthy(&results));
}
#[tokio::test]
async fn test_checkpoint_unhealthy_numeric_timestamp() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!({
"batch_cursor": {
"last_checkpoint": 1706745600
}
});
assert!(!detector.is_batch_worker_checkpoint_healthy(&results));
}
#[tokio::test]
async fn test_checkpoint_healthy_no_batch_cursor() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!({ "status": "ok" });
assert!(detector.is_batch_worker_checkpoint_healthy(&results));
}
#[tokio::test]
async fn test_checkpoint_unhealthy_null_last_checkpoint() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!({
"batch_cursor": {
"last_checkpoint": null
}
});
assert!(!detector.is_batch_worker_checkpoint_healthy(&results));
}
#[tokio::test]
async fn test_checkpoint_with_different_stall_thresholds() {
let detector = make_detector_with_stall_minutes(60);
let thirty_min_ago = Utc::now() - chrono::Duration::minutes(30);
let results = serde_json::json!({
"batch_cursor": {
"last_checkpoint": thirty_min_ago.to_rfc3339()
}
});
assert!(detector.is_batch_worker_checkpoint_healthy(&results));
let strict_detector = make_detector_with_stall_minutes(1);
assert!(!strict_detector.is_batch_worker_checkpoint_healthy(&results));
}
#[tokio::test]
async fn test_checkpoint_healthy_batch_cursor_is_not_object() {
let detector = make_detector_with_stall_minutes(15);
let results = serde_json::json!({
"batch_cursor": "just-a-string"
});
assert!(!detector.is_batch_worker_checkpoint_healthy(&results));
}
#[test]
fn test_time_bucket_classification() {
let classify = |minutes: i32| -> &'static str {
if minutes <= 120 {
"60-120"
} else if minutes <= 360 {
"120-360"
} else {
">360"
}
};
assert_eq!(classify(60), "60-120");
assert_eq!(classify(120), "60-120");
assert_eq!(classify(121), "120-360");
assert_eq!(classify(360), "120-360");
assert_eq!(classify(361), ">360");
assert_eq!(classify(1440), ">360");
}
}