use crate::{Result, Error};
use crate::storage::{Keys, RedisClient};
use fred::prelude::{RedisKey, RedisValue};
pub async fn register(redis: &RedisClient, task_id: &str, deps: &[String]) -> Result<()> {
if deps.is_empty() {
return Ok(());
}
let pending_deps_key: RedisKey = Keys::pending_deps(task_id).into();
for dep_id in deps {
redis.sadd(pending_deps_key.clone(), dep_id.as_str().into()).await?;
let task_deps_key: RedisKey = Keys::task_deps(dep_id).into();
redis.sadd(task_deps_key, task_id.into()).await?;
}
tracing::debug!("Task {} registered with {} dependencies", task_id, deps.len());
Ok(())
}
pub async fn check_dependents(redis: &RedisClient, completed_task_id: &str) -> Result<usize> {
let task_deps_key: RedisKey = Keys::task_deps(completed_task_id).into();
let dependents = redis.smembers(task_deps_key.clone()).await?;
if dependents.is_empty() {
return Ok(0);
}
tracing::debug!("Task {} has {} dependent tasks", completed_task_id, dependents.len());
let mut enqueued_count = 0;
let mut failed_count = 0;
for dependent_id in dependents {
let dependent_id_str = dependent_id.as_str().to_string();
let pending_deps_key: RedisKey = Keys::pending_deps(&dependent_id_str).into();
redis.srem(pending_deps_key.clone(), completed_task_id.into()).await?;
let remaining_deps: u64 = redis.scard(pending_deps_key.clone()).await?;
if remaining_deps == 0 {
let task_key: RedisKey = Keys::task(&dependent_id_str).into();
match redis.hget(task_key.clone(), "queue".into()).await? {
Some(queue_value) => {
let queue_name = match queue_value.as_bytes() {
Some(bytes) => std::str::from_utf8(bytes)
.map_err(|e| Error::Serialization(format!("Invalid queue name: {}", e)))?,
None => {
tracing::warn!("Task {} has invalid queue field, skipping", dependent_id);
failed_count += 1;
let _ = redis.del(vec![pending_deps_key]).await;
continue;
}
};
let queue_key: RedisKey = Keys::queue(queue_name).into();
match redis.rpush(queue_key, dependent_id.as_str().into()).await {
Ok(_) => {
tracing::info!("Task {} enqueued to queue '{}' after all dependencies satisfied", dependent_id, queue_name);
let _ = redis.del(vec![pending_deps_key]).await;
enqueued_count += 1;
}
Err(e) => {
tracing::error!("Failed to enqueue dependent task {}: {}", dependent_id, e);
failed_count += 1;
}
}
}
None => {
tracing::warn!("Task {} not found, skipping", dependent_id);
failed_count += 1;
let _ = redis.del(vec![pending_deps_key]).await;
}
}
} else {
tracing::debug!("Task {} still has {} pending dependencies", dependent_id, remaining_deps);
}
}
redis.del(vec![task_deps_key]).await?;
if failed_count > 0 {
tracing::warn!("{} dependent tasks failed to enqueue, may require manual retry", failed_count);
}
Ok(enqueued_count)
}
pub async fn fail_dependents(redis: &RedisClient, failed_task_id: &str, queue: &str) -> Result<usize> {
let task_deps_key: RedisKey = Keys::task_deps(failed_task_id).into();
let dependents = redis.smembers(task_deps_key.clone()).await?;
if dependents.is_empty() {
return Ok(0);
}
tracing::warn!(
"Task {} failed permanently, failing {} dependent tasks",
failed_task_id,
dependents.len()
);
let mut failed_count = 0;
for dependent_id in &dependents {
let dependent_id_str = dependent_id.as_str();
let task_key: RedisKey = Keys::task(dependent_id_str).into();
match redis.hget(task_key.clone(), "data".into()).await? {
Some(data) => {
if let Some(bytes) = data.as_bytes() {
match rmp_serde::from_slice::<crate::Task>(bytes) {
Ok(mut task) => {
task.status = crate::task::TaskStatus::Dead;
task.last_error = Some(format!(
"Dependency task {} failed permanently",
failed_task_id
));
match rmp_serde::to_vec(&task) {
Ok(new_data) => {
if let Err(e) = redis
.hset(
task_key.clone(),
vec![
("data".into(), RedisValue::Bytes(new_data.into())),
("queue".into(), queue.into()),
],
)
.await
{
tracing::error!(
"Failed to update dependent task {}: {}",
dependent_id_str,
e
);
continue;
}
let dead_key: RedisKey = Keys::dead(queue).into();
if let Err(e) = redis
.lpush(dead_key, dependent_id.as_str().into())
.await
{
tracing::error!(
"Failed to add dependent task {} to dead queue: {}",
dependent_id_str,
e
);
continue;
}
tracing::info!(
"Dependent task {} moved to dead queue due to failed dependency {}",
dependent_id_str,
failed_task_id
);
failed_count += 1;
}
Err(e) => {
tracing::error!(
"Failed to serialize dependent task {}: {}",
dependent_id_str,
e
);
}
}
}
Err(e) => {
tracing::error!(
"Failed to deserialize dependent task {}: {}",
dependent_id_str,
e
);
}
}
}
}
None => {
tracing::warn!("Dependent task {} data not found, skipping", dependent_id_str);
}
}
let pending_deps_key: RedisKey = Keys::pending_deps(dependent_id_str).into();
let _ = redis.del(vec![pending_deps_key]).await;
}
let _ = redis.del(vec![task_deps_key]).await;
Ok(failed_count)
}