# rsigma-runtime
Streaming runtime for [rsigma](https://github.com/timescale/rsigma) — input format adapters, batch log processing, hot-reload, dynamic source resolution, and pluggable metrics.
## Features
- **Input adapters**: JSON/NDJSON, syslog (RFC 3164/5424), logfmt, CEF, EVTX (Windows Event Log), plain text, and auto-detect. Line-oriented adapters parse raw log lines into typed events implementing the `rsigma_eval::Event` trait. The EVTX adapter reads binary `.evtx` files directly via `EvtxFileReader` and yields `serde_json::Value` records.
- **`LogProcessor`**: batch evaluation pipeline with atomic engine swap via `ArcSwap`, `MetricsHook` for pluggable metrics, and `EventFilter` for JSON payload extraction.
- **`RuntimeEngine`**: wraps `Engine` and `CorrelationEngine` with rule loading, reload, and correlation state management.
- **Dynamic source resolution**: `SourceResolver` trait with `DefaultSourceResolver` implementation fetching data from files, commands, HTTP APIs, and NATS subjects. Includes template expansion, extraction (jq/JSONPath/CEL), caching with TTL, and scheduled refresh.
- **I/O**: `EventSource` trait (stdin, HTTP, NATS) and `Sink` enum (stdout, file, NATS) with fan-out support.
- **OTLP**: `LogRecord`-to-JSON conversion for OpenTelemetry log ingestion (feature-gated under `otlp`). Resource and log attributes are flattened for direct Sigma rule matching.
## Usage
```rust
use std::sync::Arc;
use rsigma_eval::CorrelationConfig;
use rsigma_runtime::{InputFormat, LogProcessor, NoopMetrics, RuntimeEngine};
let mut engine = RuntimeEngine::new(
"rules/".into(),
vec![],
CorrelationConfig::default(),
false,
);
engine.load_rules().unwrap();
let processor = LogProcessor::new(engine, Arc::new(NoopMetrics));
let batch = vec![r#"{"CommandLine": "cmd /c whoami"}"#.to_string()];
let results = processor.process_batch_with_format(
&batch,
&InputFormat::Json,
None,
);
```
See the [examples](examples/) directory for complete working programs.
## Dynamic Source Resolution
The `sources` module provides the infrastructure for resolving external data at pipeline load time. This allows pipeline values to be populated from live data rather than hardcoded.
### Core types
```rust
use rsigma_runtime::{DefaultSourceResolver, SourceResolver, SourceCache, ResolvedValue, SourceError};
// Create a resolver with in-memory cache
let resolver = DefaultSourceResolver::new();
// Or with SQLite-backed persistence and TTL
use std::time::Duration;
let cache = SourceCache::with_sqlite_and_ttl(
std::path::Path::new("/tmp/rsigma-cache.db"),
Some(Duration::from_secs(3600)),
).unwrap();
let resolver = DefaultSourceResolver::with_cache(cache);
```
### Resolution flow
1. **Fetch**: the source type (file, command, HTTP, NATS) determines how raw data is obtained.
2. **Parse**: raw bytes are parsed according to the declared `DataFormat` (JSON, YAML, lines, CSV).
3. **Extract**: an optional expression (jq, JSONPath, or CEL) selects a subset of the parsed data.
4. **Cache**: successful results are stored for `use_cached` error policy fallback.
5. **Template expansion**: `TemplateExpander` substitutes `${source.<id>}` references with resolved values.
### Resolving all sources
```rust
use rsigma_runtime::sources::resolve_all;
// sources: &[DynamicSource] from the parsed pipeline
let resolved_map = resolve_all(&resolver, &pipeline.sources).await?;
// resolved_map: HashMap<String, serde_json::Value>
// Or with PipelineState tracking:
use rsigma_runtime::sources::resolve_all_with_state;
let resolved_map = resolve_all_with_state(&resolver, &pipeline.sources, Some(&mut state)).await?;
```
### Extraction languages
The `extract` module supports three languages for selecting data from resolved sources:
| jq | Complex transformations, array iteration, filtering | `.indicators[].ip` |
| JSONPath | Simple path queries into nested JSON | `$.data.items[*].value` |
| CEL | Typed expressions with filtering and aggregation | `data.filter(x, x.score > 7)` |
### Refresh scheduling
The `refresh` module manages automatic re-resolution:
- **Interval**: periodic timer fires resolution on a configurable cadence.
- **Watch**: file system notifications (via `notify`) trigger re-resolution when a file source changes.
- **Push**: NATS messages on the source subject trigger immediate updates.
- **OnDemand**: resolution only happens when triggered via API, SIGHUP, or NATS control subject (`rsigma.control.resolve`).
### Include expansion
The `include` module splices transformation blocks from resolved sources into the pipeline:
```rust
use rsigma_runtime::sources::include::expand_includes;
// Modifies pipeline.transformations in place, replacing Include directives
// with the resolved transformation blocks.
expand_includes(&mut pipeline, &resolved_map, allow_remote_include)?;
```
Recursive includes are rejected (max depth 1) to prevent cycles.
### Template expansion
The `template` module replaces `${source.<id>}` and `${source.<id>.<path>}` references in pipeline vars:
```rust
use rsigma_runtime::sources::TemplateExpander;
// Returns a new pipeline with vars expanded using resolved source data.
let expanded_pipeline = TemplateExpander::expand(&pipeline, &resolved_map);
```
## Feature flags
| `logfmt` | Enable logfmt input adapter |
| `cef` | Enable CEF (ArcSight) input adapter |
| `evtx` | Enable EVTX (Windows Event Log) input adapter. Provides `EvtxFileReader` for reading `.evtx` files and iterating records as `serde_json::Value` |
| `nats` | Enable NATS JetStream source and sink, NATS dynamic sources, and NATS control subject |
| `otlp` | Enable OTLP log ingestion types and `LogRecord`-to-JSON conversion |
| `daachorse-index` | Forward to `rsigma-eval/daachorse-index`. Enables `RuntimeEngine::set_cross_rule_ac` and the cross-rule Aho-Corasick pre-filter for large substring-heavy rule sets |
## License
MIT