use crate::error::{
ProcessingKind, RequestKind, processing_error_response, request_parse_error_response,
request_validation_error_response,
};
use crate::handlers::{
NotificationErrorKind, parse_and_validate_request, process_notification_request,
save_to_backend,
};
use crate::metrics::AppMetrics;
use crate::notification::OperationType;
use crate::notification::decode_subject_for_display;
use crate::notification_backend::NotificationBackend;
use crate::routes::streaming::{
StreamOperation, bucket_event_type_for_observability, enforce_known_event_type,
enforce_stream_auth,
};
use crate::telemetry::{SERVICE_NAME, SERVICE_VERSION};
use crate::types::{NotificationRequest, NotificationResponse};
use actix_web::{HttpRequest, HttpResponse, web};
use std::sync::Arc;
use tracing::info;
use tracing_actix_web::RequestId;
#[utoipa::path(
post,
path = "/api/v1/notification",
tag = "notification",
request_body = NotificationRequest,
responses(
(status = 200, description = "Notification processed and stored successfully", body = crate::types::NotificationResponse),
(status = 400, description = "Invalid request data or validation failure"),
(status = 401, description = "Missing or invalid credentials (when stream requires auth)"),
(status = 403, description = "Valid credentials but insufficient roles for this stream"),
(status = 500, description = "Internal server error during processing"),
(status = 503, description = "Authentication service unavailable (direct mode)")
),
security(
(),
("bearer_jwt" = []),
("basic" = []),
)
)]
#[tracing::instrument(
skip(body, notification_backend, metrics),
fields(
event_type = tracing::field::Empty,
topic = tracing::field::Empty,
request_id = %request_id,
spatial_enabled = tracing::field::Empty,
)
)]
pub async fn notify(
http_request: HttpRequest,
body: web::Bytes,
notification_backend: web::Data<Arc<dyn NotificationBackend>>,
request_id: RequestId,
metrics: Option<web::Data<AppMetrics>>,
) -> HttpResponse {
let request_id_str = request_id.to_string();
let payload = match parse_and_validate_request(&body) {
Ok(p) => p,
Err(e) => {
record_notification(&metrics, "unknown", "error");
return request_parse_error_response(RequestKind::Notification, e, &request_id_str);
}
};
let event_type = &payload.event_type;
let request_params = &payload.identifier;
if let Err(response) = enforce_known_event_type(&http_request, event_type) {
record_notification(&metrics, "unknown", "rejected");
return response;
}
let event_type_label = bucket_event_type_for_observability(event_type);
tracing::Span::current().record("event_type", event_type_label);
if payload.identifier.contains_key("point") {
record_notification(&metrics, event_type_label, "error");
return request_validation_error_response(
RequestKind::Notification,
anyhow::anyhow!(
"identifier.point is only supported for watch/replay endpoints, not /notification"
),
&request_id_str,
);
}
if let Err(response) = enforce_stream_auth(&http_request, event_type, StreamOperation::Write) {
record_notification(&metrics, event_type_label, "rejected");
return response;
}
let notification_result = match process_notification_request(
event_type,
request_params,
&payload.payload,
OperationType::Notify,
) {
Ok(result) => result,
Err(e) => match e.kind {
NotificationErrorKind::Validation => {
record_notification(&metrics, event_type_label, "error");
return request_validation_error_response(
RequestKind::Notification,
e.source,
&request_id_str,
);
}
NotificationErrorKind::Processing => {
record_notification(&metrics, event_type_label, "error");
return processing_error_response(
ProcessingKind::NotificationProcessing,
e.source,
&request_id_str,
);
}
},
};
let display_topic = decode_subject_for_display(¬ification_result.topic);
tracing::Span::current().record("topic", &display_topic);
tracing::Span::current().record(
"spatial_enabled",
notification_result.spatial_metadata.is_some(),
);
let payload_string = payload
.payload
.as_ref()
.map(serde_json::Value::to_string)
.unwrap_or_else(|| "null".to_string());
let payload_size = payload_string.len();
if let Err(e) = save_to_backend(
¬ification_result,
payload_string,
notification_backend.get_ref().as_ref(),
)
.await
{
record_notification(&metrics, event_type_label, "error");
return processing_error_response(ProcessingKind::NotificationStorage, e, &request_id_str);
}
record_notification(&metrics, event_type_label, "success");
let response = NotificationResponse {
status: "success".to_string(),
request_id: request_id_str,
processed_at: chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true),
};
let payload_kind = payload
.payload
.as_ref()
.map(json_value_kind)
.unwrap_or("null");
let spatial_enabled = notification_result.spatial_metadata.is_some();
let spatial_bbox = notification_result
.spatial_metadata
.as_ref()
.map(|metadata| metadata.bounding_box.as_str());
info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "api.notification.processed",
outcome = "success",
topic = %display_topic,
event_type = %notification_result.event_type,
param_count = notification_result.canonicalized_params.len(),
payload_kind = %payload_kind,
payload_size = payload_size,
spatial_enabled = spatial_enabled,
spatial_bbox = ?spatial_bbox,
"Notification processed and saved successfully"
);
HttpResponse::Ok().json(response)
}
fn record_notification(metrics: &Option<web::Data<AppMetrics>>, event_type: &str, status: &str) {
if let Some(m) = metrics {
m.notifications_total
.with_label_values(&[event_type, status])
.inc();
}
}
fn json_value_kind(value: &serde_json::Value) -> &'static str {
match value {
serde_json::Value::Null => "null",
serde_json::Value::Bool(_) => "bool",
serde_json::Value::Number(_) => "number",
serde_json::Value::String(_) => "string",
serde_json::Value::Array(_) => "array",
serde_json::Value::Object(_) => "object",
}
}