Bria
Briareus — One Command. Hundred Actions.
Bria is a Rust-based multi-pipeline job orchestrator. It ingests jobs from files, HTTP/webhooks, AMQP, cron, PostgreSQL, or SQLite, runs local, Docker, or WebAssembly tasks, and emits results to files, webhooks, AMQP, databases, or live streams.
Quick start
CLI
| Command | Description |
|---|---|
bria --config Config.toml |
Validate configuration and run Bria. |
bria ping |
Print pong. |
--config can also be supplied with BRIA_CONFIG. The default is Config.toml.
Configuration model
| Section | Purpose |
|---|---|
[global] |
Runtime, logging, state, retry, and timeout defaults. |
[server] |
Optional HTTP control plane for HTTP/webhook sources and streams. |
[[sources]] |
Inputs that produce jobs. |
[[tasks]] |
Reusable task definitions. |
[[sinks]] |
Outputs that receive pipeline results. |
[[pipelines]] |
DAGs connecting sources, tasks, and sinks. |
Environment variables in the form ${VAR_NAME} are resolved during config loading. Missing variables fail fast.
Parameters
Global
| Key | Default | Description |
|---|---|---|
worker_threads |
0 |
Tokio worker threads; 0 uses logical CPUs. |
shutdown_timeout_secs |
30 |
Orchestrator shutdown timeout. |
tmp_dir |
OS temp dir | Temporary file directory. |
max_payload_bytes |
10485760 |
Maximum job payload size. |
cancel_signal_ttl_secs |
3600 |
How long cancellation signals are retained. |
Logging: [global.log]
| Key | Default | Values |
|---|---|---|
level |
info |
trace, debug, info, warn, error |
format |
auto | text, json; auto uses text on TTY and JSON otherwise |
file |
"" |
Optional log file path |
State: [global.state]
| Key | Default | Description |
|---|---|---|
backend |
memory |
memory, sqlite, or pg. |
sqlite_path |
bria-state.db |
SQLite state database. |
pg_url |
"" |
Required when backend = "pg". |
State stores queued/running job records for restart recovery. Schema is created automatically on first use.
Retry and timeout defaults
| Section | Keys |
|---|---|
[global.retry] |
max_attempts, base_delay_ms, max_delay_ms, jitter |
[global.timeout] |
step_secs, action (kill/term), kill_grace_secs |
Retry precedence: step > task > global. Backoff uses exponential delay and random jitter.
Server: [server]
| Key | Default | Description |
|---|---|---|
enabled |
false |
Enable HTTP server. |
bind |
0.0.0.0 |
Bind address. |
port |
4000 |
Listen port. |
prefix |
v1 |
Route prefix. |
api_key |
"" |
Optional API key for all routes. Use Authorization: Bearer or X-Bria-Api-Key. |
dashboard |
"" |
Static dashboard directory. |
shutdown_timeout_secs |
5 |
HTTP drain timeout. |
max_body_bytes |
52428800 |
Server-wide body limit. |
Routes: GET /{prefix}/ping, POST /{prefix}/{source.path}, DELETE /{prefix}/{source.path}/{job_id}, POST /{prefix}/pipelines/{id}/resume, plus configured SSE/WebSocket stream paths.
Sources
| Type | Required | Important parameters |
|---|---|---|
file |
path |
poll_interval_secs, track_cursor, authoritative, id_field, max_body_bytes, labels |
http |
path, server.enabled=true |
max_body_bytes, id_field, labels |
webhook |
path, server.enabled=true |
hmac_secret, hmac_header, ack_status, max_body_bytes |
queue |
url, exchange |
username, password, submit_routing_key, cancel_routing_key, reconnect_secs, qos_prefetch, consumer_tag |
cron |
schedule |
tz, [sources.payload], labels |
pg |
url, [sources.table] |
poll_interval_secs, table column names/status values |
sqlite |
path, [sources.table] |
same table parameters as pg |
Table source columns: id, payload, created_at, status, status_claimed_value, status_done_value, status_failed_value.
Tasks
| Key | Default | Description |
|---|---|---|
id |
required | Task identifier. |
driver |
local |
local, docker, or wasm. |
cmd |
required | Command, image, or .wasm path. Supports templates. |
args |
[] |
Argument templates. |
inherit_env |
false |
Keep parent environment. |
working_dir |
current dir | Child working directory. |
success_exit_codes |
[0] |
Successful exit codes. |
timeout_secs |
global | Per-task timeout. |
timeout_action |
global | kill or term. |
kill_grace_secs |
global | Grace after SIGTERM. |
[tasks.env] |
{} |
Environment variables/templates. |
[tasks.stdin] |
mode="none" |
none, payload, or template. |
[tasks.stdout] / [tasks.stderr] |
capture |
mode: capture, stream, discard; max_bytes. |
[tasks.retry] |
global | Retry overrides. |
Driver-specific sections:
| Section | Keys |
|---|---|
[tasks.docker] |
flags, mounts, pull (always, missing, never) |
[tasks.wasm] |
dirs, max_memory_pages, fuel |
Sinks
| Type | Required | Parameters |
|---|---|---|
file |
path |
template |
webhook |
url |
secret, signature_header, content_type, max_retries, retry_base_ms, timeout_secs, headers |
queue |
url, exchange |
username, password, success_routing_key, failure_routing_key, reconnect_secs |
pg |
url, [sinks.table] |
Result table and column names |
sqlite |
path, [sinks.table] |
Result table and column names |
stream |
server.enabled=true |
sse, websocket, ws_heartbeat_secs, sse_keepalive_secs, broadcast_capacity |
Table sink columns: result_id, job_id, pipeline_id, step_id, occurred_at, exit_code, stdout, stderr, duration_ms, attempt, status.
Pipelines and steps
| Key | Description |
|---|---|
id |
Pipeline identifier. |
source |
Single source id. |
sources |
Multiple source entries for merge pipelines. |
[pipelines.merge] |
strategy (any/all), correlation_key or correlation_expr, timeout_secs. |
concurrency |
Maximum concurrent steps/jobs. |
queue_capacity |
Bounded channel size. |
sinks |
Pipeline-level sinks. |
[pipelines.failure] |
action (discard, dead_letter, stop) and optional sink. |
labels |
Labels merged into jobs. |
Step types:
| Type | Required | Behavior |
|---|---|---|
process |
task |
Runs a task. |
map |
[[pipelines.steps.set]] |
Mutates job.payload using CEL expressions. |
condition |
expr |
On false, action = "fail", "skip_to", or "emit". |
Step parameters include depends_on, [with] overrides, [outputs], [retry], [failure], sinks, and [[routing]] conditional sinks.
Templates and expressions
Templates use MiniJinja and can access job.*, steps.*, env.*, now, now_unix, pipeline.*, result.*, and occurred_at depending on context.
CEL expressions can read job.*, steps.*, and pipeline.*:
[[]]
= "job.payload.output_url"
= '"s3://" + job.payload.bucket + "/" + job.payload.key'
Example: HTTP job to local task and file sink
[]
= true
= 4000
[[]]
= "api"
= "http"
= "jobs"
= "id"
[[]]
= "hello"
= "local"
= "sh"
= ["-c", "printf '{\"message\":\"hello %s\"}' \"$1\"", "sh", "{{job.payload.name}}"]
[[]]
= "results"
= "file"
= "results.jsonl"
[[]]
= "hello-pipeline"
= "api"
= ["results"]
[[]]
= "run"
= "process"
= "hello"
Send a job:
Docker
The default CMD passes --config /etc/bria/Config.toml. Override it to run a
one-shot check or the built-in health command:
The image includes an OCI HEALTHCHECK that calls bria ping every 30 s.
E2E Docker Compose files and run script live in tests/e2e/ — see tests/e2e/README.md.
Developer functions and exported API
| Item | Purpose |
|---|---|
Config::load_from_path |
Load TOML with environment substitution. |
Config::from_str_with_env |
Parse TOML string with ${VAR} expansion. |
Config::validate |
Validate references and type-specific requirements. |
Config::get_task, Config::get_sink |
Lookup helpers. |
Orchestrator::new |
Initialize logging and state store. |
Orchestrator::run |
Start sources, server, routers, workers, and sinks. |
run_pipeline_once |
Execute one pipeline in tests or embedded use. |
create_store |
Create memory/SQLite/PostgreSQL state store. |
StateStore |
Trait for queued/running/completed state and recovery. |
Testing
# Lint and unit/integration tests
# End-to-end scenarios (requires Docker)
# See tests/e2e/README.md for the full scenario list and architecture
License
MIT