torc 0.21.0

Workflow management system
using Torc
import Torc: APIClient

function process_mapped_function_cli_args(
    run_job_function::Function,
    run_postprocess_function::Union{Function, Nothing},
)
    if length(ARGS) != 2
        error("Usage: $(PROGRAM_FILE) URL run|postprocess")
    end

    # These environment variables get set by the torc job runner.
    for key in ("TORC_WORKFLOW_ID", "TORC_JOB_ID")
        !haskey(ENV, key) && error("Bug: environment variable $key is not defined")
    end
    workflow_id = parse(Int64, ENV["TORC_WORKFLOW_ID"])
    job_id = parse(Int64, ENV["TORC_JOB_ID"])

    url = ARGS[1]
    command = ARGS[2]
    api = make_api(url)
    response = send_api_command(
        api,
        APIClient.list_user_data,
        workflow_id;
        consumer_job_id = job_id,
    )
    items = response.items

    if command == "run"
        result = _run_function(items, run_job_function)
    elseif command == "postprocess"
        result = _postprocess_function(items, run_postprocess_function)
    end

    if !isnothing(result)
        _store_result(api, workflow_id, job_id, result)
    end
end

function _run_function(input_items::Vector, run_job_function::Function)
    length(input_items) != 1 && error("Bug: unexpected input data: $(input_items)")
    return run_job_function(input_items[1].data["params"])
end

function _postprocess_function(input_items::Vector, run_postprocess_function::Function)
    isempty(input_items) && error("Bug: no results were passed to run_postprocess_function")
    return run_postprocess_function([x.data for x in input_items])
end

function _store_result(api, workflow_id::Int64, job_id::Int64, result::Dict)
    resp = send_api_command(
        api,
        APIClient.list_user_data,
        workflow_id;
        producer_job_id = job_id,
    )
    if length(resp.items) != 1
        error(
            "Received unexpected output data placeholder from database: " *
            "job_id = $(job_id) response = $(resp)",
        )
    end

    output = resp.items[1]
    output.data = result
    send_api_command(
        api,
        APIClient.update_user_data,
        output.id,
        output,
    )
    @info "Stored result for $(job_id)"
end