Skip to main content

rsigma_eval/pipeline/
sources.rs

1//! Dynamic source declarations and template references for dynamic Sigma pipelines.
2//!
3//! This module defines the types for declaring external data sources in a pipeline
4//! YAML (`sources` section) and for tracking `${source.*}` template references
5//! found throughout the pipeline.
6
7use std::collections::HashMap;
8use std::path::PathBuf;
9use std::time::Duration;
10
11// =============================================================================
12// Dynamic source declaration
13// =============================================================================
14
15/// A dynamic source declared in the pipeline's `sources` section.
16///
17/// Each source describes how to fetch external data that can be referenced
18/// by `${source.<id>}` expressions anywhere in the pipeline YAML.
19#[derive(Debug, Clone, PartialEq)]
20pub struct DynamicSource {
21    /// Unique identifier for this source, referenced in `${source.<id>}` expressions.
22    pub id: String,
23    /// The type-specific configuration for fetching data.
24    pub source_type: SourceType,
25    /// How often the source data should be refreshed.
26    pub refresh: RefreshPolicy,
27    /// Maximum time to wait for a fetch to complete.
28    pub timeout: Option<Duration>,
29    /// What to do when a fetch fails.
30    pub on_error: ErrorPolicy,
31    /// Whether the daemon must resolve this source before processing events.
32    pub required: bool,
33    /// Fallback value if the source cannot be resolved.
34    pub default: Option<yaml_serde::Value>,
35}
36
37/// Type-specific configuration for a dynamic source.
38#[derive(Debug, Clone, PartialEq)]
39pub enum SourceType {
40    /// Fetch data from an HTTP endpoint.
41    Http {
42        url: String,
43        method: Option<String>,
44        headers: HashMap<String, String>,
45        format: DataFormat,
46        extract: Option<ExtractExpr>,
47    },
48    /// Run a local command and capture its stdout.
49    Command {
50        command: Vec<String>,
51        format: DataFormat,
52        extract: Option<ExtractExpr>,
53    },
54    /// Read data from a local file.
55    File {
56        path: PathBuf,
57        format: DataFormat,
58        extract: Option<ExtractExpr>,
59    },
60    /// Subscribe to a NATS subject for push-based updates.
61    Nats {
62        url: String,
63        subject: String,
64        format: DataFormat,
65        extract: Option<ExtractExpr>,
66    },
67}
68
69/// An extraction expression applied to source data after parsing.
70///
71/// Supports two syntax forms in YAML:
72/// - Plain string: always jq (the common case): `extract: ".emails[]"`
73/// - Structured object: explicit language: `extract: { expr: "$.emails[*]", type: jsonpath }`
74#[derive(Debug, Clone, PartialEq)]
75pub enum ExtractExpr {
76    /// A jq expression (default). Evaluated via jaq.
77    Jq(String),
78    /// A JSONPath expression. Evaluated via serde_json_path.
79    JsonPath(String),
80    /// A CEL (Common Expression Language) expression. Evaluated via cel-interpreter.
81    Cel(String),
82}
83
84/// How often a source should be refreshed.
85#[derive(Debug, Clone, PartialEq)]
86pub enum RefreshPolicy {
87    /// Fetch at startup only, never refresh.
88    Once,
89    /// Re-fetch on a fixed interval.
90    Interval(Duration),
91    /// Watch the file for changes (file sources only).
92    Watch,
93    /// Value updated on each incoming NATS message (NATS sources only).
94    Push,
95    /// Fetch at startup, then only when explicitly triggered via API/signal.
96    OnDemand,
97}
98
99/// What to do when a source fetch fails.
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub enum ErrorPolicy {
102    /// Use the last successfully fetched value.
103    UseCached,
104    /// Fail the pipeline load (at startup: exit; at runtime: keep previous state).
105    Fail,
106    /// Fall back to the declared `default` value.
107    UseDefault,
108}
109
110/// The format of data returned by a source.
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum DataFormat {
113    /// JSON (parsed with serde_json).
114    Json,
115    /// YAML (parsed with yaml_serde).
116    Yaml,
117    /// One value per line.
118    Lines,
119    /// Comma-separated values.
120    Csv,
121}
122
123// =============================================================================
124// Source references (detected during parsing)
125// =============================================================================
126
127/// A `${source.*}` template reference found in the pipeline YAML.
128#[derive(Debug, Clone, PartialEq)]
129pub struct SourceRef {
130    /// The source ID (first path segment after `source.`).
131    pub source_id: String,
132    /// Optional dot-path into the source data (e.g., `field_mapping` in `${source.env_config.field_mapping}`).
133    pub sub_path: Option<String>,
134    /// Where in the pipeline this reference appears.
135    pub location: RefLocation,
136    /// The raw template string as it appeared in the YAML.
137    pub raw_template: String,
138}
139
140/// Where in the pipeline a source reference appears.
141#[derive(Debug, Clone, PartialEq)]
142pub enum RefLocation {
143    /// In the `vars` section, under the given variable name.
144    Var { var_name: String },
145    /// In a transformation's field value.
146    TransformationField {
147        transform_index: usize,
148        field_name: String,
149    },
150    /// An `include` directive in the transformations list.
151    Include { transform_index: usize },
152}
153
154// =============================================================================
155// Source status (for PipelineState tracking)
156// =============================================================================
157
158/// Resolution status of a dynamic source.
159#[derive(Debug, Clone, Copy, PartialEq, Eq)]
160pub enum SourceStatus {
161    /// Source has not been resolved yet.
162    Pending,
163    /// Source was successfully resolved.
164    Resolved,
165    /// Source resolution failed.
166    Failed,
167}