bria 0.1.3

Multi-pipeline job orchestrator
Documentation

Bria

Briareus — One Command. Hundred Actions.

Bria is a Rust-based multi-pipeline job orchestrator. It ingests jobs from files, HTTP/webhooks, AMQP, cron, PostgreSQL, or SQLite, runs local, Docker, or WebAssembly tasks, and emits results to files, webhooks, AMQP, databases, or live streams.

Quick start

cargo install bria

The default install is intentionally lightweight and includes core file/local/Docker orchestration. Enable optional integrations only when needed, for example:

cargo install bria --features full
cargo install bria --features server,webhook,sqlite
bria --config Config.toml

CLI

Command Description
bria --config Config.toml Validate configuration and run Bria.
bria ping Print pong.

--config can also be supplied with BRIA_CONFIG. The default is Config.toml.

Configuration model

Section Purpose
[global] Runtime, logging, state, retry, and timeout defaults.
[server] Optional HTTP control plane for HTTP/webhook sources and streams.
[[sources]] Inputs that produce jobs.
[[tasks]] Reusable task definitions.
[[sinks]] Outputs that receive pipeline results.
[[pipelines]] DAGs connecting sources, tasks, and sinks.

Environment variables in the form ${VAR_NAME} are resolved during config loading. Missing variables fail fast.

Parameters

Global

Key Default Description
worker_threads 0 Tokio worker threads; 0 uses logical CPUs.
shutdown_timeout_secs 30 Orchestrator shutdown timeout.
tmp_dir OS temp dir Temporary file directory.
max_payload_bytes 10485760 Maximum job payload size.
cancel_signal_ttl_secs 3600 How long cancellation signals are retained.

Logging: [global.log]

Key Default Values
level info trace, debug, info, warn, error
format auto text, json; auto uses text on TTY and JSON otherwise
file "" Optional log file path

State: [global.state]

Key Default Description
backend memory memory, sqlite, or pg.
sqlite_path bria-state.db SQLite state database.
pg_url "" Required when backend = "pg".

State stores queued/running job records for restart recovery. Schema is created automatically on first use.

Retry and timeout defaults

Section Keys
[global.retry] max_attempts, base_delay_ms, max_delay_ms, jitter
[global.timeout] step_secs, action (kill/term), kill_grace_secs

Retry precedence: step > task > global. Backoff uses exponential delay and random jitter.

Server: [server]

Key Default Description
enabled false Enable HTTP server.
bind 0.0.0.0 Bind address.
port 4000 Listen port.
prefix v1 Route prefix.
api_key "" Optional API key for all routes. Use Authorization: Bearer or X-Bria-Api-Key.
dashboard "" Static dashboard directory.
shutdown_timeout_secs 5 HTTP drain timeout.
max_body_bytes 52428800 Server-wide body limit.

Routes: GET /{prefix}/ping, POST /{prefix}/{source.path}, DELETE /{prefix}/{source.path}/{job_id}, POST /{prefix}/pipelines/{id}/resume, plus configured SSE/WebSocket stream paths.

Sources

Type Required Important parameters
file path poll_interval_secs, track_cursor, authoritative, id_field, max_body_bytes, labels
http path, server.enabled=true max_body_bytes, id_field, labels
webhook path, server.enabled=true hmac_secret, hmac_header, ack_status, max_body_bytes
queue url, exchange username, password, submit_routing_key, cancel_routing_key, reconnect_secs, qos_prefetch, consumer_tag
cron schedule tz, [sources.payload], labels
pg url, [sources.table] poll_interval_secs, table column names/status values
sqlite path, [sources.table] same table parameters as pg

Table source columns: id, payload, created_at, status, status_claimed_value, status_done_value, status_failed_value.

Tasks

Key Default Description
id required Task identifier.
driver local local, docker, or wasm.
cmd required Command, image, or .wasm path. Supports templates.
args [] Argument templates.
inherit_env false Keep parent environment.
working_dir current dir Child working directory.
success_exit_codes [0] Successful exit codes.
timeout_secs global Per-task timeout.
timeout_action global kill or term.
kill_grace_secs global Grace after SIGTERM.
[tasks.env] {} Environment variables/templates.
[tasks.stdin] mode="none" none, payload, or template.
[tasks.stdout] / [tasks.stderr] capture mode: capture, stream, discard; max_bytes.
[tasks.retry] global Retry overrides.

Driver-specific sections:

Section Keys
[tasks.docker] flags, mounts, pull (always, missing, never)
[tasks.wasm] dirs, max_memory_pages, fuel

Sinks

Type Required Parameters
file path template
webhook url secret, signature_header, content_type, max_retries, retry_base_ms, timeout_secs, headers
queue url, exchange username, password, success_routing_key, failure_routing_key, reconnect_secs
pg url, [sinks.table] Result table and column names
sqlite path, [sinks.table] Result table and column names
stream server.enabled=true sse, websocket, ws_heartbeat_secs, sse_keepalive_secs, broadcast_capacity

Table sink columns: result_id, job_id, pipeline_id, step_id, occurred_at, exit_code, stdout, stderr, duration_ms, attempt, status.

Pipelines and steps

Key Description
id Pipeline identifier.
source Single source id.
sources Multiple source entries for merge pipelines.
[pipelines.merge] strategy (any/all), correlation_key or correlation_expr, timeout_secs.
concurrency Maximum concurrent steps/jobs.
queue_capacity Bounded channel size.
sinks Pipeline-level sinks.
[pipelines.failure] action (discard, dead_letter, stop) and optional sink.
labels Labels merged into jobs.

Step types:

Type Required Behavior
process task Runs a task.
map [[pipelines.steps.set]] Mutates job.payload using CEL expressions.
condition expr On false, action = "fail", "skip_to", or "emit".

Step parameters include depends_on, [with] overrides, [outputs], [retry], [failure], sinks, and [[routing]] conditional sinks.

Templates and expressions

Templates use MiniJinja and can access job.*, steps.*, env.*, now, now_unix, pipeline.*, result.*, and occurred_at depending on context.

CEL expressions can read job.*, steps.*, and pipeline.*:

[[pipelines.steps.set]]
target = "job.payload.output_url"
expr = '"s3://" + job.payload.bucket + "/" + job.payload.key'

Example: HTTP job to local task and file sink

[server]
enabled = true
port = 4000

[[sources]]
id = "api"
type = "http"
path = "jobs"
id_field = "id"

[[tasks]]
id = "hello"
driver = "local"
cmd = "sh"
args = ["-c", "printf '{\"message\":\"hello %s\"}' \"$1\"", "sh", "{{job.payload.name}}"]

[[sinks]]
id = "results"
type = "file"
path = "results.jsonl"

[[pipelines]]
id = "hello-pipeline"
source = "api"
sinks = ["results"]

[[pipelines.steps]]
id = "run"
type = "process"
task = "hello"

Send a job:

curl -X POST http://localhost:4000/v1/jobs \
  -H 'content-type: application/json' \
  -d '{"id":"job-1","name":"Bria"}'

Docker

docker run --rm -p 4000:4000 \
  -v "$PWD/Config.toml:/etc/bria/Config.toml:ro" \
  ghcr.io/melonask/bria:latest

The default CMD passes --config /etc/bria/Config.toml. Override it to run a one-shot check or the built-in health command:

docker run --rm bria:latest ping          # always works, no config needed

The image includes an OCI HEALTHCHECK that calls bria ping every 30 s.

E2E Docker Compose files and run script live in tests/e2e/ — see tests/e2e/README.md.

Developer functions and exported API

Item Purpose
Config::load_from_path Load TOML with environment substitution.
Config::from_str_with_env Parse TOML string with ${VAR} expansion.
Config::validate Validate references and type-specific requirements.
Config::get_task, Config::get_sink Lookup helpers.
Orchestrator::new Initialize logging and state store.
Orchestrator::run Start sources, server, routers, workers, and sinks.
run_pipeline_once Execute one pipeline in tests or embedded use.
create_store Create memory/SQLite/PostgreSQL state store.
StateStore Trait for queued/running/completed state and recovery.

Testing

# Lint and unit/integration tests
cargo fmt --check
cargo clippy --all-targets --all-features -- -D warnings
cargo test

# End-to-end scenarios (requires Docker)
cd tests/e2e
./run.sh --all                    # build, run all 19 scenarios (~6 min), tear down
./run.sh --infra-up               # start shared infra (postgres, rabbitmq, etc.)
./run.sh http-pg                  # run a single scenario
./run.sh --infra-down             # tear down shared infra
# See tests/e2e/README.md for the full scenario list and architecture

License

MIT