use std::time::Instant;
use rsigma_eval::pipeline::sources::{DataFormat, ExtractExpr};
use super::extract::apply_extract;
use super::file::parse_data;
use super::{MAX_SOURCE_RESPONSE_BYTES, ResolvedValue, SourceError, SourceErrorKind};
fn check_payload_size(payload: &[u8], max_bytes: usize) -> Result<(), SourceError> {
if payload.len() > max_bytes {
return Err(SourceError {
source_id: String::new(),
kind: SourceErrorKind::ResourceLimit(format!(
"NATS message payload ({} bytes) exceeds {} byte limit",
payload.len(),
max_bytes
)),
});
}
Ok(())
}
#[cfg(feature = "nats")]
pub async fn resolve_nats_initial(
url: &str,
subject: &str,
format: DataFormat,
extract_expr: Option<&ExtractExpr>,
) -> Result<ResolvedValue, SourceError> {
use futures::StreamExt;
let client = async_nats::connect(url).await.map_err(|e| SourceError {
source_id: String::new(),
kind: SourceErrorKind::Fetch(format!("failed to connect to NATS at {url}: {e}")),
})?;
let mut sub = client
.subscribe(subject.to_string())
.await
.map_err(|e| SourceError {
source_id: String::new(),
kind: SourceErrorKind::Fetch(format!("failed to subscribe to '{subject}': {e}")),
})?;
let data = match tokio::time::timeout(std::time::Duration::from_secs(1), sub.next()).await {
Ok(Some(msg)) => {
check_payload_size(&msg.payload, MAX_SOURCE_RESPONSE_BYTES)?;
let raw = std::str::from_utf8(&msg.payload).map_err(|e| SourceError {
source_id: String::new(),
kind: SourceErrorKind::Parse(format!("NATS message is not valid UTF-8: {e}")),
})?;
let parsed = parse_data(raw, format)?;
if let Some(expr) = extract_expr {
apply_extract(&parsed, expr)?
} else {
parsed
}
}
_ => serde_json::Value::Null,
};
Ok(ResolvedValue {
data,
resolved_at: Instant::now(),
from_cache: false,
})
}
#[cfg(feature = "nats")]
pub fn parse_nats_message(
payload: &[u8],
format: DataFormat,
extract_expr: Option<&ExtractExpr>,
) -> Result<serde_json::Value, SourceError> {
check_payload_size(payload, MAX_SOURCE_RESPONSE_BYTES)?;
let raw = std::str::from_utf8(payload).map_err(|e| SourceError {
source_id: String::new(),
kind: SourceErrorKind::Parse(format!("NATS message is not valid UTF-8: {e}")),
})?;
let parsed = parse_data(raw, format)?;
if let Some(expr) = extract_expr {
apply_extract(&parsed, expr)
} else {
Ok(parsed)
}
}