orion-server 0.2.0

Declarative services runtime powered by dataflow-rs
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

Orion is a declarative services runtime written in Rust. It exposes business logic management through channels (service endpoints) and workflows (task pipelines powered by dataflow-rs) via a REST API. Ships as a single binary with an embedded SQLite database.

- **Rust Edition:** 2024. **MSRV: 1.88** (`rust-version` in Cargo.toml) — driven by let-chains (`if let Some(x) = a && let Some(y) = b`, stabilized in 1.88) and dependency requirements (`mongodb`, `serde_with`, `time`, `tonic`).
- **Core dependencies:** `dataflow-rs` 3.0 (workflow engine), `datalogic-rs` 5 (JSONLogic), `axum` 0.8 (HTTP), `sqlx` 0.8 (database), `sea-query` 0.32 (portable SQL builder)
- **Single binary:** `orion-server` (server, `src/main.rs`)

## Build & Development Commands

```bash
cargo build                        # Build (all features included)
cargo build --release              # Release build

cargo run -- --config ./config.toml  # Run with config file

cargo test                         # Run all tests
cargo test <test_name>             # Run a single test by name

cargo clippy                       # Lint
cargo fmt                          # Format code
```

Docker: `docker build -t orion .` (multi-stage: rust:1.93-slim -> debian:trixie-slim).

## Runtime Configuration

All capabilities are compiled into a single binary — no feature flags. Behavior is controlled at runtime:

| Capability | Configuration | Default |
|-----------|--------------|---------|
| Database backend | `storage.url` scheme (`sqlite:`, `postgres://`, `mysql://`) | SQLite |
| Kafka | `kafka.enabled` | Disabled |
| OpenTelemetry | `tracing.enabled` | Disabled |
| Trace persistence mode | `tracing.storage.mode` (`sync` / `async` / `batch` / `off`) — global default with per-channel override via `config.tracing` | Sync |
| TLS/HTTPS | `server.tls.enabled` | Disabled |
| Swagger UI | Always at `/docs` | Enabled |
| SQL connectors | `db_read`/`db_write` functions | Always available |
| Redis cache | `cache_read`/`cache_write` with Redis backend | Always available |
| MongoDB connector | `mongo_read` function | Always available |

## Architecture

### Module Structure

```
src/
├── main.rs              # CLI entrypoint, startup sequence
├── lib.rs               # Public module declarations
├── channel/             # Channel registry, config, routing, deduplication
├── config/              # Configuration loading & validation
├── connector/           # Connector types, registry, circuit breakers, pool caching
├── engine/              # Dataflow engine & custom function handlers
│   └── functions/       # http_call, channel_call, db_read/write, cache_read/write, etc.
├── errors.rs            # OrionError enum → HTTP response mapping
├── kafka/               # Kafka producer & consumer
├── metrics/             # Prometheus metrics collection
├── queue/               # Async trace processing, DLQ retry
├── server/              # HTTP server, routes, middleware, state
│   └── routes/          # admin/ (workflows, channels, connectors, engine, audit, backups), data
├── storage/             # Database abstraction, models, repositories
│   └── repositories/    # workflows, channels, connectors, traces, trace_dlq, audit_logs
└── validation/          # Input validation, SSRF protection
```

### Startup Sequence (main.rs)

CLI args → config (TOML + `ORION_SECTION__KEY` env overrides) → tracing → metrics → detect DB backend from URL → DB pool + migrations → repositories (workflows, channels, connectors, traces, audit_logs) → ConnectorRegistry → HTTP client → engine lock (pre-created for channel_call) → cache pool → external pool caches (SQL, MongoDB) → custom functions → Kafka producer (if enabled) → load active channels + workflows → filter by include/exclude patterns → build engine → populate engine lock → reload ChannelRegistry → Kafka consumer (if enabled, config + DB topics merged) → trace queue workers → trace cleanup → DLQ retry → rate limiter → Axum HTTP server → graceful shutdown on SIGTERM/SIGINT.

### Key Architectural Patterns

- **Channels + Workflows:** Channels are service endpoints (sync/async, REST/HTTP/Kafka) that link to workflows. Workflows are versioned task pipelines with JSONLogic conditions. A channel references a workflow via `workflow_id`.
- **Repository pattern:** Trait-based (`WorkflowRepository`, `ChannelRepository`, `ConnectorRepository`, `TraceRepository`, `TraceDlqRepository`, `AuditLogRepository`) with SQL implementations. Traits use `async_trait`. All repos are stored as `Arc<dyn Trait>` in `AppState`.
- **Engine hot-reload:** Engine is `Arc<RwLock<Arc<Engine>>>`. Double-Arc allows swapping the inner engine while readers hold the old one. Reload triggers on status changes (activate/archive), delete, and manually via `POST /api/v1/admin/engine/reload`. Draft creates/updates do not trigger reload. Also rebuilds `ChannelRegistry` and restarts Kafka consumer if topic set changed.
- **Channel registry:** In-memory `ChannelRegistry` (`channel/registry.rs`) holds `ChannelRuntimeConfig` per active channel — parsed config, rate limiters, compiled validation logic, backpressure semaphores, dedup stores, response caches. Has a `RouteTable` for REST route matching (method + path pattern with parameter extraction). Rebuilt on engine reload.
- **Custom async functions:** 8 handlers implement `dataflow_rs::engine::functions::AsyncFunctionHandler`, registered in `engine/mod.rs::build_custom_functions()`: `http_call`, `channel_call`, `cache_read`, `cache_write`, `db_read`, `db_write`, `mongo_read`, `publish_kafka`
- **Connector registry:** In-memory `RwLock<HashMap<String, Arc<ConnectorConfig>>>` with secret masking on API reads, circuit breakers per connector with LRU eviction.
- **Trace queue:** `tokio::sync::mpsc` channel with semaphore-limited concurrency for async trace processing (`queue/mod.rs`). Failed traces go to DLQ table with automatic retry.
- **Error handling:** `OrionError` enum in `errors.rs` implements `axum::response::IntoResponse`, mapping variants to HTTP status codes. Returns JSON `{"error": {"code": "...", "message": "..."}}`.
- **AppState** (`server/state.rs`): Central shared state struct holding engine, all repos, connector registry, cache pool, channel registry, trace queue, config, metrics handle, HTTP client, DataLogic instance, rate limit state, readiness flag, sql_pool_cache, mongo_pool_cache, kafka_consumer_handle, and kafka_producer. Passed to all route handlers via Axum's `State` extractor.

### Middleware Stack (server/mod.rs)

1. CatchPanicLayer (outermost — panic recovery)
2. OTel trace context extraction (if `tracing.enabled`)
3. HTTP metrics middleware
4. Admin auth middleware (if enabled)
5. Rate limiting middleware (if enabled)
6. Body limit (max payload size)
7. Compression (gzip/brotli)
8. Security headers (CSP, X-Frame-Options, X-Content-Type-Options, Referrer-Policy, Permissions-Policy, HSTS)
9. Request ID layer (generate/propagate x-request-id)
10. Trace layer (request/response tracing)
11. CORS layer

### Request Processing Flow

```
HTTP Request → Axum Router → Data Route Handler
  → Route Resolution (REST pattern match → channel name lookup → fallback)
  → Channel Registry (dedup check, rate limit, validation, backpressure, cache check)
  → Engine (RwLock<Arc<Engine>>)
    → Channel Router (match by channel name)
    → Workflow Matcher (JSONLogic condition evaluation + rollout bucket)
    → Task Pipeline (ordered function execution)
  → Response (cache store, JSON response)
```

### API Structure

- **Admin** (`/api/v1/admin/`):
  - **Channels:** CRUD, status management (draft/active/archived), versioning
  - **Workflows:** CRUD, status management, versioning, rollout, dry-run test, import/export, validate
  - **Connectors:** CRUD, reload, circuit breakers (list/reset)
  - **Engine:** status, reload
  - **Audit logs:** list with filtering
  - **Backup/Restore:** database export and import
- **Data** (`/api/v1/data/`): Dynamic handler `/{*path}` — resolves to channel via REST route match or name lookup. Supports sync and async (trailing `/async`). Trace list/get endpoints.
- **Operational:** `GET /health`, `GET /healthz` (liveness), `GET /readyz` (readiness), `GET /metrics`
- **API docs:** `GET /docs` (Swagger UI), `GET /api/v1/openapi.json`

### Database

SQLite (default), PostgreSQL, or MySQL — selected at runtime from `storage.url` scheme. All three migration sets are embedded via `sqlx::migrate!()` and the correct set is chosen at startup based on the detected backend (`DbBackend` enum in `storage/mod.rs`). `DbPool` is an enum wrapping the concrete pool types (`SqlitePool`/`PgPool`/`MySqlPool`) with dispatch helpers for query execution. Tables: `workflows` (composite PK `(workflow_id, version)`), `channels` (composite PK `(channel_id, version)`), `connectors`, `traces`, `trace_dlq`, `audit_logs`. Views: `current_workflows`, `current_channels` (latest version per ID). Triggers enforce single-draft-per-ID and active-immutability constraints. Migrations per backend in `migrations/{sqlite,postgres,mysql}/`.

## Testing

- **Integration tests** in `tests/`: Use `common::test_app()` which creates an in-memory SQLite DB, full `AppState`, and Axum router. Tests use `tower::ServiceExt::oneshot()` (no HTTP server needed).
- **Test helpers** in `tests/common/mod.rs`:
  - `test_app()` — returns a ready-to-use `Router` with in-memory DB
  - `json_request(method, uri, body)` — builds an HTTP `Request<Body>` with JSON content-type
  - `body_json(response)` — extracts and parses the response body as `serde_json::Value`
- **Pattern for new integration tests:** Clone the app, call `.oneshot(json_request(...))`, assert status, parse body with `body_json()`. See `tests/admin_workflows_test.rs` for examples.
- **Test files:** `admin_workflows_test`, `admin_channels_test`, `admin_connectors_test`, `channel_call_test`, `channel_config_test`, `rest_routing_test`, `rate_limit_test`, `concurrency_test`, `async_traces_test`, `error_paths_test`, `security_test`, `shutdown_test`, `pool_exhaustion_test`, `openapi_test`, `kafka_test`.
- **Benchmarks:** `tests/benchmark/bench.sh` — 6 scenarios using `hey` HTTP load generator.

## Configuration

See `config.toml.example`. All settings have sensible defaults. Environment variables override via `ORION_SECTION__KEY` format (e.g., `ORION_SERVER__PORT=3000`).

### CLI Commands

```bash
orion-server                              # Start server
orion-server -c config.toml               # Start with config
orion-server validate-config              # Validate config
orion-server migrate                      # Run migrations
orion-server migrate --dry-run            # Preview migrations
```