celers 0.2.0

Celery-compatible distributed task queue for Rust (Facade crate)
Documentation
use crate::{Chain, Group, Signature};
use serde_json::Value;

/// Creates a conditional workflow with success and failure branches
///
/// This creates a chain that includes both success and failure paths.
/// The actual branching logic must be implemented in the condition task.
///
/// # Arguments
///
/// * `condition_task` - Task that evaluates the condition
/// * `condition_args` - Arguments for the condition task
/// * `success_task` - Task to execute on success
/// * `success_args` - Arguments for success task
/// * `failure_task` - Task to execute on failure
/// * `failure_args` - Arguments for failure task
///
/// # Example
///
/// ```rust,ignore
/// use celers::advanced_patterns::create_conditional_workflow;
/// use serde_json::json;
///
/// let workflow = create_conditional_workflow(
///     "check_balance",
///     vec![json!({"account_id": 123})],
///     "process_payment",
///     vec![json!({"amount": 100})],
///     "send_insufficient_funds_notice",
///     vec![json!({"account_id": 123})]
/// );
/// ```
pub fn create_conditional_workflow(
    condition_task: &str,
    condition_args: Vec<Value>,
    success_task: &str,
    success_args: Vec<Value>,
    failure_task: &str,
    failure_args: Vec<Value>,
) -> Chain {
    // Create a chain with condition task
    // The condition task should route to success or failure based on its result
    let mut chain = Chain::new();
    chain = chain.then(condition_task, condition_args);
    chain = chain.then(success_task, success_args);
    // Note: Failure path would be implemented via task routing/error handling
    // This is a template for the workflow structure
    let _ = failure_task;
    let _ = failure_args;
    chain
}

/// Creates a dynamic workflow where tasks are generated at runtime
///
/// This pattern allows for workflows where the number and type of tasks
/// are determined dynamically based on input data.
///
/// # Arguments
///
/// * `generator_task` - Task that generates the list of tasks to execute
/// * `generator_args` - Arguments for the generator task
/// * `executor_task` - Task that executes the generated tasks
///
/// # Example
///
/// ```rust,ignore
/// use celers::advanced_patterns::create_dynamic_workflow;
/// use serde_json::json;
///
/// let workflow = create_dynamic_workflow(
///     "generate_tasks",
///     vec![json!({"rules": "config.json"})],
///     "execute_task"
/// );
/// ```
pub fn create_dynamic_workflow(
    generator_task: &str,
    generator_args: Vec<Value>,
    executor_task: &str,
) -> Chain {
    Chain::new()
        .then(generator_task, generator_args)
        .then(executor_task, vec![])
}

/// Creates a workflow with parallel sub-chains
///
/// This pattern executes multiple chains in parallel, useful for
/// independent workflows that should run concurrently.
///
/// # Arguments
///
/// * `chains` - List of (chain_name, tasks) tuples
/// * `aggregate_task` - Optional task to aggregate results from all chains
///
/// # Example
///
/// ```rust,ignore
/// use celers::advanced_patterns::create_parallel_chains;
/// use serde_json::json;
///
/// let chains = vec![
///     ("process_images", vec![("resize", vec![]), ("optimize", vec![])]),
///     ("process_videos", vec![("transcode", vec![]), ("thumbnail", vec![])]),
/// ];
///
/// let workflow = create_parallel_chains(chains, Some("finalize"));
/// ```
#[allow(clippy::type_complexity)]
pub fn create_parallel_chains(
    chains: Vec<(&str, Vec<(&str, Vec<Value>)>)>,
    aggregate_task: Option<&str>,
) -> Group {
    let mut group = Group::new();

    for (_chain_name, tasks) in chains {
        // Create a signature for the first task in each chain
        if let Some((first_task, first_args)) = tasks.first() {
            let sig = Signature::new(first_task.to_string()).with_args(first_args.clone());
            group.tasks.push(sig);
        }
    }

    // Note: Aggregate task would be added as a chord if provided
    let _ = aggregate_task;

    group
}

/// Creates a saga pattern workflow for distributed transactions
///
/// Implements the saga pattern with compensating transactions for each step.
///
/// # Arguments
///
/// * `steps` - List of (forward_task, forward_args, compensate_task, compensate_args) tuples
///
/// # Example
///
/// ```rust,ignore
/// use celers::advanced_patterns::create_saga_workflow;
/// use serde_json::json;
///
/// let steps = vec![
///     ("reserve_inventory", vec![json!(1)], "release_inventory", vec![json!(1)]),
///     ("charge_payment", vec![json!(2)], "refund_payment", vec![json!(2)]),
///     ("ship_order", vec![json!(3)], "cancel_shipment", vec![json!(3)]),
/// ];
///
/// let workflow = create_saga_workflow(steps);
/// ```
pub fn create_saga_workflow(steps: Vec<(&str, Vec<Value>, &str, Vec<Value>)>) -> Chain {
    let mut chain = Chain::new();

    // Add forward tasks
    for (forward_task, forward_args, _compensate_task, _compensate_args) in steps {
        chain = chain.then(forward_task, forward_args);
        // Note: Compensation tasks would be invoked on failure
        // This requires error handling logic in the task implementation
    }

    chain
}