use anyhow::Result;
use std::collections::HashMap;
use tracing_actix_web::RequestId;
use crate::configuration::Settings;
use crate::notification::{IdentifierConstraint, NotificationHandler, OperationType};
use crate::notification_backend::replay::StartAt;
use crate::types::NotificationRequest;
#[derive(Debug, Clone)]
pub struct StreamingRequestContext {
pub event_type: String,
pub topic: String,
pub canonicalized_params: HashMap<String, String>,
pub identifier_constraints: HashMap<String, IdentifierConstraint>,
pub start_at: StartAt,
pub request_id: RequestId,
}
#[derive(Debug, Clone)]
pub struct ValidationConfig {
pub require_replay_params: bool,
pub operation_type: OperationType,
}
impl ValidationConfig {
pub fn for_watch() -> Self {
Self {
require_replay_params: false,
operation_type: OperationType::Watch,
}
}
pub fn for_replay() -> Self {
Self {
require_replay_params: true,
operation_type: OperationType::Replay,
}
}
}
pub struct StreamingRequestProcessor;
impl StreamingRequestProcessor {
pub fn process_request(
request: &NotificationRequest,
request_id: RequestId,
config: ValidationConfig,
) -> Result<StreamingRequestContext> {
let start_at = Self::validate_replay_parameters(request, &config)?;
let notification_handler =
NotificationHandler::from_config(Settings::get_global_notification_schema().as_ref());
let notification_result = notification_handler.process_request(
&request.event_type,
&request.identifier,
&None, config.operation_type,
)?;
Ok(StreamingRequestContext {
event_type: notification_result.event_type,
topic: notification_result.topic,
canonicalized_params: notification_result.canonicalized_params,
identifier_constraints: notification_result.identifier_constraints,
start_at,
request_id,
})
}
fn validate_replay_parameters(
request: &NotificationRequest,
config: &ValidationConfig,
) -> Result<StartAt> {
request.validate_spatial_filters()?;
let start_at = request.validate_start_at()?;
if config.require_replay_params && matches!(start_at, StartAt::LiveOnly) {
anyhow::bail!(
"Replay endpoint requires either from_id or from_date parameter. \
Use from_id for sequence-based replay or from_date for time-based replay."
);
}
Ok(start_at)
}
}