pub mod cache;
pub mod command;
pub mod extract;
pub mod file;
pub mod http;
pub mod include;
#[cfg(feature = "nats")]
pub mod nats;
pub mod refresh;
pub mod template;
use std::time::{Duration, Instant};
use rsigma_eval::pipeline::sources::{DynamicSource, ErrorPolicy, SourceType};
pub const MAX_SOURCE_RESPONSE_BYTES: usize = 10 * 1024 * 1024;
pub const MIN_REFRESH_INTERVAL: Duration = Duration::from_secs(1);
pub use cache::SourceCache;
pub use template::TemplateExpander;
#[derive(Debug, Clone)]
pub struct ResolvedValue {
pub data: serde_json::Value,
pub resolved_at: Instant,
pub from_cache: bool,
}
#[derive(Debug, Clone)]
pub struct SourceError {
pub source_id: String,
pub kind: SourceErrorKind,
}
impl std::fmt::Display for SourceError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "source '{}': {}", self.source_id, self.kind)
}
}
impl std::error::Error for SourceError {}
#[derive(Debug, Clone)]
pub enum SourceErrorKind {
Fetch(String),
Parse(String),
Extract(String),
Timeout,
ResourceLimit(String),
}
impl std::fmt::Display for SourceErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Fetch(msg) => write!(f, "fetch failed: {msg}"),
Self::Parse(msg) => write!(f, "parse failed: {msg}"),
Self::Extract(msg) => write!(f, "extract failed: {msg}"),
Self::Timeout => write!(f, "timed out"),
Self::ResourceLimit(msg) => write!(f, "resource limit exceeded: {msg}"),
}
}
}
#[async_trait::async_trait]
pub trait SourceResolver: Send + Sync {
async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError>;
}
pub struct DefaultSourceResolver {
cache: SourceCache,
}
impl DefaultSourceResolver {
pub fn new() -> Self {
Self {
cache: SourceCache::new(),
}
}
pub fn with_cache(cache: SourceCache) -> Self {
Self { cache }
}
pub fn cache(&self) -> &SourceCache {
&self.cache
}
}
impl Default for DefaultSourceResolver {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl SourceResolver for DefaultSourceResolver {
async fn resolve(&self, source: &DynamicSource) -> Result<ResolvedValue, SourceError> {
let result = match &source.source_type {
SourceType::File {
path,
format,
extract,
} => file::resolve_file(path, *format, extract.as_ref()).await,
SourceType::Command {
command,
format,
extract,
} => command::resolve_command(command, *format, extract.as_ref(), source.timeout).await,
SourceType::Http {
url,
method,
headers,
format,
extract,
} => {
http::resolve_http(
url,
method.as_deref(),
headers,
*format,
extract.as_ref(),
source.timeout,
)
.await
}
#[cfg(feature = "nats")]
SourceType::Nats {
url,
subject,
format,
extract,
} => nats::resolve_nats_initial(url, subject, *format, extract.as_ref()).await,
#[cfg(not(feature = "nats"))]
SourceType::Nats { .. } => {
return Err(SourceError {
source_id: source.id.clone(),
kind: SourceErrorKind::Fetch("NATS source requires the 'nats' feature".into()),
});
}
};
match result {
Ok(value) => {
tracing::debug!(source_id = %source.id, "Source fetched successfully");
self.cache.store(&source.id, &value.data);
Ok(value)
}
Err(mut err) => {
err.source_id = source.id.clone();
match source.on_error {
ErrorPolicy::UseCached => {
if let Some(cached) = self.cache.get(&source.id) {
tracing::warn!(
source_id = %source.id,
error = %err,
"Source resolution failed, using cached value"
);
Ok(ResolvedValue {
data: cached,
resolved_at: Instant::now(),
from_cache: true,
})
} else {
Err(err)
}
}
ErrorPolicy::UseDefault => {
if let Some(default) = &source.default {
tracing::warn!(
source_id = %source.id,
error = %err,
"Source resolution failed, using default value"
);
let json_default = yaml_value_to_json(default);
Ok(ResolvedValue {
data: json_default,
resolved_at: Instant::now(),
from_cache: false,
})
} else {
Err(err)
}
}
ErrorPolicy::Fail => Err(err),
}
}
}
}
}
pub async fn resolve_all(
resolver: &dyn SourceResolver,
sources: &[DynamicSource],
) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
resolve_all_with_state(resolver, sources, None).await
}
pub async fn resolve_all_with_state(
resolver: &dyn SourceResolver,
sources: &[DynamicSource],
mut state: Option<&mut rsigma_eval::pipeline::state::PipelineState>,
) -> Result<std::collections::HashMap<String, serde_json::Value>, SourceError> {
let mut resolved = std::collections::HashMap::new();
for source in sources {
match resolver.resolve(source).await {
Ok(value) => {
resolved.insert(source.id.clone(), value.data);
if let Some(s) = state.as_deref_mut() {
s.mark_source_resolved(&source.id);
}
}
Err(e) => {
if let Some(s) = state.as_deref_mut() {
s.mark_source_failed(&source.id);
}
if source.required {
return Err(e);
}
tracing::warn!(
source_id = %source.id,
error = %e,
"Optional source resolution failed, using null"
);
resolved.insert(source.id.clone(), serde_json::Value::Null);
}
}
}
Ok(resolved)
}
pub fn yaml_value_to_json(yaml: &yaml_serde::Value) -> serde_json::Value {
match yaml {
yaml_serde::Value::Null => serde_json::Value::Null,
yaml_serde::Value::Bool(b) => serde_json::Value::Bool(*b),
yaml_serde::Value::Number(n) => {
if let Some(i) = n.as_i64() {
serde_json::Value::Number(i.into())
} else if let Some(u) = n.as_u64() {
serde_json::Value::Number(u.into())
} else if let Some(f) = n.as_f64() {
serde_json::json!(f)
} else {
serde_json::Value::Null
}
}
yaml_serde::Value::String(s) => serde_json::Value::String(s.clone()),
yaml_serde::Value::Sequence(seq) => {
serde_json::Value::Array(seq.iter().map(yaml_value_to_json).collect())
}
yaml_serde::Value::Mapping(map) => {
let obj = map
.iter()
.map(|(k, v)| {
let key = match k {
yaml_serde::Value::String(s) => s.clone(),
other => format!("{other:?}"),
};
(key, yaml_value_to_json(v))
})
.collect();
serde_json::Value::Object(obj)
}
yaml_serde::Value::Tagged(tagged) => yaml_value_to_json(&tagged.value),
}
}