use crate::config::SourceConfig;
use crate::event::EmittedEvent;
use anyhow::Context;
use chrono::Utc;
use super::helpers::effective_source_label;
pub(crate) fn parse_events_from_body_for_source(
body: &str,
source: &SourceConfig,
) -> anyhow::Result<Vec<serde_json::Value>> {
let value: serde_json::Value = serde_json::from_str(body).context("parse response json")?;
parse_events_from_value_for_source(&value, source)
}
pub(crate) fn parse_events_from_value_for_source(
value: &serde_json::Value,
source: &SourceConfig,
) -> anyhow::Result<Vec<serde_json::Value>> {
let path = source.response_events_path.as_deref();
let obj_path = source.response_event_object_path.as_deref();
if path.is_some() || obj_path.is_some() {
parse_events_from_value_with_path(value, path, obj_path)
} else {
parse_events_from_value(value)
}
}
fn parse_events_from_value_with_path(
value: &serde_json::Value,
events_path: Option<&str>,
event_object_path: Option<&str>,
) -> anyhow::Result<Vec<serde_json::Value>> {
let arr = match events_path {
Some(p) => json_path_array(value, p).ok_or_else(|| {
anyhow::anyhow!("response_events_path {:?} did not resolve to an array", p)
})?,
None => {
if let Some(arr) = value.as_array() {
arr.clone()
} else if let Some(obj) = value.as_object() {
for key in &["items", "data", "events", "logs", "entries"] {
if let Some(v) = obj.get(*key).and_then(|v| v.as_array()) {
return Ok(unwrap_event_objects(v, event_object_path));
}
}
anyhow::bail!(
"response has no top-level array or known events key (items/data/events/logs/entries)"
);
} else {
anyhow::bail!("response root is not an object or array");
}
}
};
Ok(unwrap_event_objects(&arr, event_object_path))
}
fn json_path_array(value: &serde_json::Value, path: &str) -> Option<Vec<serde_json::Value>> {
let mut v = value;
for segment in path.split('.') {
v = v.get(segment)?;
}
v.as_array().cloned()
}
fn unwrap_event_objects(
arr: &[serde_json::Value],
object_path: Option<&str>,
) -> Vec<serde_json::Value> {
let Some(path) = object_path else {
return arr.to_vec();
};
arr.iter()
.filter_map(|el| {
let mut v = el;
for segment in path.split('.') {
v = v.get(segment)?;
}
Some(v.clone())
})
.collect()
}
pub(crate) fn parse_events_from_value(
value: &serde_json::Value,
) -> anyhow::Result<Vec<serde_json::Value>> {
if let Some(arr) = value.as_array() {
return Ok(arr.clone());
}
if let Some(obj) = value.as_object() {
for key in &["items", "data", "events", "logs", "entries"] {
if let Some(v) = obj.get(*key)
&& let Some(arr) = v.as_array()
{
return Ok(arr.clone());
}
}
}
Ok(vec![value.clone()])
}
pub(crate) fn json_path_str(value: &serde_json::Value, path: &str) -> Option<String> {
let mut v = value;
for segment in path.split('.') {
v = v.get(segment)?;
}
v.as_str().map(|s| s.to_string())
}
pub(crate) fn event_id(event: &serde_json::Value, id_path: &str) -> Option<String> {
let mut v = event;
for segment in id_path.split('.') {
v = v.get(segment)?;
}
v.as_str().map(|s| s.to_string())
}
fn event_ts_fallback(event: &serde_json::Value) -> String {
let s = event
.get("published")
.or_else(|| event.get("timestamp"))
.or_else(|| event.get("ts"))
.or_else(|| event.get("created_at"))
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if s.is_empty() {
Utc::now().to_rfc3339()
} else {
s
}
}
pub(crate) fn event_ts_with_field(
event: &serde_json::Value,
timestamp_field: Option<&str>,
) -> String {
if let Some(path) = timestamp_field
&& let Some(s) = json_path_str(event, path)
&& !s.is_empty()
{
return s;
}
event_ts_fallback(event)
}
pub(crate) fn build_emitted_event(
source: &SourceConfig,
source_id: &str,
path: &str,
event_value: &serde_json::Value,
) -> EmittedEvent {
let ts = event_ts_with_field(
event_value,
source
.transform
.as_ref()
.and_then(|t| t.timestamp_field.as_deref()),
);
let label = effective_source_label(source, source_id);
let mut emitted = EmittedEvent::new(ts, label, path.to_string(), event_value.clone());
if let Some(id_path) = source.transform.as_ref().and_then(|t| t.id_field.as_ref())
&& let Some(id) = event_id(event_value, id_path)
{
emitted = emitted.with_id(id);
}
emitted
}