use actix_web::web;
use anyhow::Result;
use serde_json::Value;
use std::collections::HashMap;
use tracing::debug;
use super::{NotificationProcessor, NotificationRegistry, OperationType, ProcessingResult};
use crate::configuration::Settings;
use crate::telemetry::{SERVICE_NAME, SERVICE_VERSION};
pub struct NotificationHandler {
registry: NotificationRegistry,
}
impl NotificationHandler {
pub fn from_config(
notification_schema: Option<&HashMap<String, crate::configuration::EventSchema>>,
) -> Self {
let registry = if let Some(schemas) = notification_schema {
NotificationRegistry::from_config(schemas)
} else {
NotificationRegistry::new()
};
Self { registry }
}
pub fn process_request(
&self,
event_type: &str,
request_params: &HashMap<String, Value>,
payload: &Option<serde_json::Value>,
operation: OperationType,
) -> Result<ProcessingResult> {
let processor = NotificationProcessor::new(&self.registry);
processor.process_request_with_values(event_type, request_params, payload, operation)
}
pub fn get_identifier_keys(&self, event_type: &str) -> Result<Vec<String>> {
self.registry.get_identifier_keys(event_type)
}
pub fn get_required_identifier_keys(&self, event_type: &str) -> Result<Vec<String>> {
self.registry.get_required_identifier_keys(event_type)
}
pub fn get_whole_schema(&self) -> &HashMap<String, crate::configuration::EventSchema> {
self.registry.get_whole_schema()
}
}
pub fn extract_aviso_notification(
payload: &web::Json<Value>,
operation: OperationType,
) -> Result<ProcessingResult> {
let (event_type, request_params) = extract_aviso_data(payload)?;
let notification_handler =
NotificationHandler::from_config(Settings::get_global_notification_schema().as_ref());
let mut processing_result =
notification_handler.process_request(&event_type, &request_params, &None, operation)?;
processing_result.event_type = event_type;
tracing::info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "notification.aviso.processed",
operation = ?operation,
aviso_event_type = processing_result.event_type,
"Aviso notification processed with operation"
);
Ok(processing_result)
}
fn extract_aviso_data(payload: &web::Json<Value>) -> Result<(String, HashMap<String, Value>)> {
let data = payload
.get("data")
.ok_or_else(|| anyhow::anyhow!("Aviso CloudEvents must include a 'data' field"))?;
let event_type = data
.get("event")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing required 'event' field in Aviso data"))?
.to_string();
let identifier_obj = data
.get("identifier")
.and_then(|v| v.as_object())
.ok_or_else(|| anyhow::anyhow!("Missing required 'identifier' field in Aviso data"))?;
let mut request_params = HashMap::new();
for (key, value) in identifier_obj {
request_params.insert(key.clone(), value.clone());
}
extract_additional_fields(data, &mut request_params);
debug!(
event_type = %event_type,
param_count = request_params.len(),
"Extracted Aviso request parameters"
);
Ok((event_type, request_params))
}
fn extract_additional_fields(data: &Value, request_params: &mut HashMap<String, Value>) {
if let Some(payload_str) = data.get("payload").and_then(|v| v.as_str()) {
request_params.insert(
"payload".to_string(),
Value::String(payload_str.to_string()),
);
}
if let Some(location_str) = data.get("location").and_then(|v| v.as_str()) {
request_params.insert(
"location".to_string(),
Value::String(location_str.to_string()),
);
}
}
#[cfg(test)]
mod tests {
use super::*;
use actix_web::web;
use serde_json::json;
#[test]
fn test_extract_aviso_data_missing_data_field() {
let payload = web::Json(json!({
"specversion": "1.0",
"id": "test",
"source": "/test",
"type": "int.ecmwf.aviso.notify"
}));
let result = extract_aviso_data(&payload);
assert!(result.is_err());
}
#[test]
fn test_extract_aviso_data_missing_event_field() {
let payload = web::Json(json!({
"specversion": "1.0",
"id": "test",
"source": "/test",
"type": "int.ecmwf.aviso.notify",
"data": {
"identifier": {
"class": "od"
}
}
}));
let result = extract_aviso_data(&payload);
assert!(result.is_err());
}
#[test]
fn test_extract_aviso_data_missing_identifier_field() {
let payload = web::Json(json!({
"specversion": "1.0",
"id": "test",
"source": "/test",
"type": "int.ecmwf.aviso.notify",
"data": {
"event": "dissemination"
}
}));
let result = extract_aviso_data(&payload);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Missing required 'identifier' field")
);
}
#[test]
fn test_extract_additional_fields() {
let payload = web::Json(json!({
"specversion": "1.0",
"id": "test",
"source": "/test",
"type": "int.ecmwf.aviso.notify",
"data": {
"event": "dissemination",
"identifier": {
"class": "od"
},
"payload": "test-payload",
"location": "/path/to/file"
}
}));
let result = extract_aviso_data(&payload);
assert!(result.is_ok());
let (_, params) = result.unwrap();
assert_eq!(
params.get("payload"),
Some(&Value::String("test-payload".to_string()))
);
assert_eq!(
params.get("location"),
Some(&Value::String("/path/to/file".to_string()))
);
}
#[test]
fn test_non_string_value_conversion() {
let payload = web::Json(json!({
"specversion": "1.0",
"id": "test",
"source": "/test",
"type": "int.ecmwf.aviso.notify",
"data": {
"event": "dissemination",
"identifier": {
"class": "od",
"step": 12, "active": true }
}
}));
let result = extract_aviso_data(&payload);
assert!(result.is_ok());
let (_, params) = result.unwrap();
assert_eq!(
params.get("step"),
Some(&Value::Number(serde_json::Number::from(12)))
);
assert_eq!(params.get("active"), Some(&Value::Bool(true)));
}
}