rsigma-runtime
Streaming runtime for rsigma — input format adapters, batch log processing, hot-reload, dynamic source resolution, and pluggable metrics.
Features
- Input adapters: JSON/NDJSON, syslog (RFC 3164/5424), logfmt, CEF, EVTX (Windows Event Log), plain text, and auto-detect. Line-oriented adapters parse raw log lines into typed events implementing the
rsigma_eval::Eventtrait. The EVTX adapter reads binary.evtxfiles directly viaEvtxFileReaderand yieldsserde_json::Valuerecords. LogProcessor: batch evaluation pipeline with atomic engine swap viaArcSwap,MetricsHookfor pluggable metrics, andEventFilterfor JSON payload extraction.RuntimeEngine: wrapsEngineandCorrelationEnginewith rule loading, reload, and correlation state management.- Dynamic source resolution:
SourceResolvertrait withDefaultSourceResolverimplementation fetching data from files, commands, HTTP APIs, and NATS subjects. Includes template expansion, extraction (jq/JSONPath/CEL), caching with TTL, and scheduled refresh. - I/O:
EventSourcetrait (stdin, HTTP, NATS) andSinkenum (stdout, file, NATS) with fan-out support. - OTLP:
LogRecord-to-JSON conversion for OpenTelemetry log ingestion (feature-gated underotlp). Resource and log attributes are flattened for direct Sigma rule matching.
Usage
use Arc;
use CorrelationConfig;
use ;
let mut engine = new;
engine.load_rules.unwrap;
let processor = new;
let batch = vec!;
let results = processor.process_batch_with_format;
See the examples directory for complete working programs.
Dynamic Source Resolution
The sources module provides the infrastructure for resolving external data at pipeline load time. This allows pipeline values to be populated from live data rather than hardcoded.
Core types
use ;
// Create a resolver with in-memory cache
let resolver = new;
// Or with SQLite-backed persistence and TTL
use Duration;
let cache = with_sqlite_and_ttl.unwrap;
let resolver = with_cache;
Resolution flow
- Fetch: the source type (file, command, HTTP, NATS) determines how raw data is obtained.
- Parse: raw bytes are parsed according to the declared
DataFormat(JSON, YAML, lines, CSV). - Extract: an optional expression (jq, JSONPath, or CEL) selects a subset of the parsed data.
- Cache: successful results are stored for
use_cachederror policy fallback. - Template expansion:
TemplateExpandersubstitutes${source.<id>}references with resolved values.
Resolving all sources
use resolve_all;
// sources: &[DynamicSource] from the parsed pipeline
let resolved_map = resolve_all.await?;
// resolved_map: HashMap<String, serde_json::Value>
// Or with PipelineState tracking:
use resolve_all_with_state;
let resolved_map = resolve_all_with_state.await?;
Extraction languages
The extract module supports three languages for selecting data from resolved sources:
| Language | Use case | Example |
|---|---|---|
| jq | Complex transformations, array iteration, filtering | .indicators[].ip |
| JSONPath | Simple path queries into nested JSON | $.data.items[*].value |
| CEL | Typed expressions with filtering and aggregation | data.filter(x, x.score > 7) |
Refresh scheduling
The refresh module manages automatic re-resolution:
- Interval: periodic timer fires resolution on a configurable cadence.
- Watch: file system notifications (via
notify) trigger re-resolution when a file source changes. - Push: NATS messages on the source subject trigger immediate updates.
- OnDemand: resolution only happens when triggered via API, SIGHUP, or NATS control subject (
rsigma.control.resolve).
Include expansion
The include module splices transformation blocks from resolved sources into the pipeline:
use expand_includes;
// Modifies pipeline.transformations in place, replacing Include directives
// with the resolved transformation blocks.
expand_includes?;
Recursive includes are rejected (max depth 1) to prevent cycles.
Template expansion
The template module replaces ${source.<id>} and ${source.<id>.<path>} references in pipeline vars:
use TemplateExpander;
// Returns a new pipeline with vars expanded using resolved source data.
let expanded_pipeline = expand;
Feature flags
| Flag | Description |
|---|---|
logfmt |
Enable logfmt input adapter |
cef |
Enable CEF (ArcSight) input adapter |
evtx |
Enable EVTX (Windows Event Log) input adapter. Provides EvtxFileReader for reading .evtx files and iterating records as serde_json::Value |
nats |
Enable NATS JetStream source and sink, NATS dynamic sources, and NATS control subject |
otlp |
Enable OTLP log ingestion types and LogRecord-to-JSON conversion |
License
MIT