use anyhow::{Context, Result, anyhow, bail};
use chrono::Utc;
use cloudevents::{EventBuilder, EventBuilderV10};
use serde_json::json;
use std::collections::HashMap;
use crate::configuration::Settings;
use crate::notification::decode_subject_for_display;
use crate::notification::topic_parser::{derive_event_type_from_topic, topic_to_request};
use crate::notification_backend::NotificationMessage;
use cloudevents::AttributesReader;
use tracing::debug;
pub struct CloudEventCreator {
base_url: String,
}
impl CloudEventCreator {
pub fn new(base_url: String) -> Self {
Self { base_url }
}
pub fn create_cloud_event(
&self,
notification: &NotificationMessage,
) -> Result<cloudevents::Event> {
let topic_base = derive_event_type_from_topic(¬ification.topic)
.context("Failed to extract topic base from notification topic")?;
let event_type = find_event_type_from_topic_base(&topic_base)
.context("Failed to determine event type from topic")?;
let request_params = topic_to_request(¬ification.topic, &event_type)
.context("Failed to reconstruct request parameters from topic")?;
let data = self.build_cloud_event_data(&request_params, ¬ification.payload)?;
let cloud_event = EventBuilderV10::new()
.id(format!("{}@{}", event_type, notification.sequence))
.source(&self.base_url)
.ty(format!("int.ecmwf.aviso.{}", event_type))
.time(notification.timestamp.unwrap_or_else(Utc::now))
.data_with_schema(
"application/json",
format!("{}/schema/{}", self.base_url, event_type),
data,
)
.build()
.context("Failed to build CloudEvent")?;
debug!(
event_id = cloud_event.id(),
event_type = %event_type,
topic = %decode_subject_for_display(¬ification.topic),
sequence = notification.sequence,
"CloudEvent created successfully"
);
Ok(cloud_event)
}
fn build_cloud_event_data(
&self,
identifier_params: &HashMap<String, String>,
payload: &str,
) -> Result<serde_json::Value> {
let payload_json = self
.parse_payload_to_json(payload)
.context("Failed to parse notification payload as JSON")?;
Ok(json!({
"identifier": identifier_params,
"payload": payload_json
}))
}
fn parse_payload_to_json(&self, payload: &str) -> Result<serde_json::Value> {
if payload.is_empty() {
return Ok(serde_json::Value::Null);
}
match serde_json::from_str::<serde_json::Value>(payload) {
Ok(json_value) => Ok(json_value),
Err(_) => {
debug!(
payload_preview = &payload[..payload.len().min(100)],
"Payload is not valid JSON, treating as string"
);
Ok(serde_json::Value::String(payload.to_string()))
}
}
}
pub fn from_global_config() -> Self {
let app_settings = Settings::get_global_application_settings();
Self::new(app_settings.base_url.clone())
}
}
pub fn create_cloud_event_from_notification(
notification: &NotificationMessage,
base_url: &str,
) -> Result<cloudevents::Event> {
let creator = CloudEventCreator::new(base_url.to_string());
creator.create_cloud_event(notification)
}
fn find_event_type_from_topic_base(topic_base: &str) -> Result<String> {
let schema = Settings::get_global_notification_schema();
let schema_map = schema
.as_ref()
.ok_or_else(|| anyhow!("No notification schema configured"))?;
for (event_type, event_schema) in schema_map {
if let Some(topic_config) = &event_schema.topic
&& topic_config.base == topic_base
{
debug!(
topic_base = %topic_base,
event_type = %event_type,
"Found event type for topic base using schema"
);
return Ok(event_type.clone());
}
}
bail!("No event type found for topic base: {}", topic_base)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
fn create_test_notification() -> NotificationMessage {
NotificationMessage {
sequence: 123,
topic: "diss.FOO.E1.od.0001.g.20190810.0.enfo.1".to_string(),
payload: r#"{"test": "data"}"#.to_string(),
timestamp: Some(Utc::now()),
metadata: None,
}
}
#[test]
fn test_parse_payload_to_json() {
let creator = CloudEventCreator::new("http://test.com".to_string());
let json_payload = r#"{"key": "value"}"#;
let result = creator.parse_payload_to_json(json_payload).unwrap();
assert!(result.is_object());
let string_payload = "simple string";
let result = creator.parse_payload_to_json(string_payload).unwrap();
assert!(result.is_string());
let empty_payload = "";
let result = creator.parse_payload_to_json(empty_payload).unwrap();
assert!(result.is_null());
}
#[test]
fn test_cloud_event_creation() {
let notification = create_test_notification();
assert_eq!(notification.sequence, 123);
assert!(notification.topic.starts_with("diss."));
}
}