rsigma_eval/pipeline/sources.rs
1//! Dynamic source declarations and template references for dynamic Sigma pipelines.
2//!
3//! This module defines the types for declaring external data sources in a pipeline
4//! YAML (`sources` section) and for tracking `${source.*}` template references
5//! found throughout the pipeline.
6
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::time::Duration;
10
11// =============================================================================
12// Dynamic source declaration
13// =============================================================================
14
15/// A dynamic source declared in the pipeline's `sources` section.
16///
17/// Each source describes how to fetch external data that can be referenced
18/// by `${source.<id>}` expressions anywhere in the pipeline YAML.
19#[derive(Debug, Clone, PartialEq)]
20pub struct DynamicSource {
21 /// Unique identifier for this source, referenced in `${source.<id>}` expressions.
22 pub id: String,
23 /// The type-specific configuration for fetching data.
24 pub source_type: SourceType,
25 /// How often the source data should be refreshed.
26 pub refresh: RefreshPolicy,
27 /// Maximum time to wait for a fetch to complete.
28 pub timeout: Option<Duration>,
29 /// What to do when a fetch fails.
30 pub on_error: ErrorPolicy,
31 /// Whether the daemon must resolve this source before processing events.
32 pub required: bool,
33 /// Fallback value if the source cannot be resolved.
34 pub default: Option<yaml_serde::Value>,
35}
36
37/// Type-specific configuration for a dynamic source.
38#[derive(Debug, Clone, PartialEq)]
39pub enum SourceType {
40 /// Fetch data from an HTTP endpoint.
41 Http {
42 url: String,
43 method: Option<String>,
44 headers: HashMap<String, String>,
45 format: DataFormat,
46 extract: Option<ExtractExpr>,
47 },
48 /// Run a local command and capture its stdout.
49 Command {
50 command: Vec<String>,
51 format: DataFormat,
52 extract: Option<ExtractExpr>,
53 },
54 /// Read data from a local file.
55 File {
56 path: PathBuf,
57 format: DataFormat,
58 extract: Option<ExtractExpr>,
59 },
60 /// Subscribe to a NATS subject for push-based updates.
61 Nats {
62 url: String,
63 subject: String,
64 format: DataFormat,
65 extract: Option<ExtractExpr>,
66 },
67}
68
69/// An extraction expression applied to source data after parsing.
70///
71/// Supports two syntax forms in YAML:
72/// - Plain string: always jq (the common case): `extract: ".emails[]"`
73/// - Structured object: explicit language: `extract: { expr: "$.emails[*]", type: jsonpath }`
74#[derive(Debug, Clone, PartialEq)]
75pub enum ExtractExpr {
76 /// A jq expression (default). Evaluated via jaq.
77 Jq(String),
78 /// A JSONPath expression. Evaluated via serde_json_path.
79 JsonPath(String),
80 /// A CEL (Common Expression Language) expression. Evaluated via cel-interpreter.
81 Cel(String),
82}
83
84/// How often a source should be refreshed.
85#[derive(Debug, Clone, PartialEq)]
86pub enum RefreshPolicy {
87 /// Fetch at startup only, never refresh.
88 Once,
89 /// Re-fetch on a fixed interval.
90 Interval(Duration),
91 /// Watch the file for changes (file sources only).
92 Watch,
93 /// Value updated on each incoming NATS message (NATS sources only).
94 Push,
95 /// Fetch at startup, then only when explicitly triggered via API/signal.
96 OnDemand,
97}
98
99/// What to do when a source fetch fails.
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ErrorPolicy {
102 /// Use the last successfully fetched value.
103 UseCached,
104 /// Fail the pipeline load (at startup: exit; at runtime: keep previous state).
105 Fail,
106 /// Fall back to the declared `default` value.
107 UseDefault,
108}
109
110/// The format of data returned by a source.
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum DataFormat {
113 /// JSON (parsed with serde_json).
114 Json,
115 /// YAML (parsed with yaml_serde).
116 Yaml,
117 /// One value per line.
118 Lines,
119 /// Comma-separated values.
120 Csv,
121}
122
123// =============================================================================
124// Source references (detected during parsing)
125// =============================================================================
126
127/// A `${source.*}` template reference found in the pipeline YAML.
128#[derive(Debug, Clone, PartialEq)]
129pub struct SourceRef {
130 /// The source ID (first path segment after `source.`).
131 pub source_id: String,
132 /// Optional dot-path into the source data (e.g., `field_mapping` in `${source.env_config.field_mapping}`).
133 pub sub_path: Option<String>,
134 /// Where in the pipeline this reference appears.
135 pub location: RefLocation,
136 /// The raw template string as it appeared in the YAML.
137 pub raw_template: String,
138}
139
140/// Where in the pipeline a source reference appears.
141#[derive(Debug, Clone, PartialEq)]
142pub enum RefLocation {
143 /// In the `vars` section, under the given variable name.
144 Var { var_name: String },
145 /// In a transformation's field value.
146 TransformationField {
147 transform_index: usize,
148 field_name: String,
149 },
150 /// An `include` directive in the transformations list.
151 Include { transform_index: usize },
152}
153
154// =============================================================================
155// Source status (for PipelineState tracking)
156// =============================================================================
157
158/// Resolution status of a dynamic source.
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160pub enum SourceStatus {
161 /// Source has not been resolved yet.
162 Pending,
163 /// Source was successfully resolved.
164 Resolved,
165 /// Source resolution failed.
166 Failed,
167}