aviso-server 0.5.0

Notification service for data-driven workflows with live and replay APIs.
// (C) Copyright 2024- ECMWF and individual contributors.
//
// This software is licensed under the terms of the Apache Licence Version 2.0
// which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
// In applying this licence, ECMWF does not waive the privileges and immunities
// granted to it by virtue of its status as an intergovernmental organisation nor
// does it submit to any jurisdiction.

//! Topic subject construction for routing.

use anyhow::{Context, Result};
use std::collections::HashMap;

use crate::configuration::EventSchema;
use crate::notification::topic_codec::encode_subject;

/// Build topic subjects from canonicalized parameters.
pub struct TopicBuilder;

impl TopicBuilder {
    /// Build a schema-driven topic subject.
    pub fn build_topic_with_schema(
        event_type: &str,
        schema: &EventSchema,
        canonicalized_params: &HashMap<String, String>,
    ) -> Result<String> {
        if let Some(topic_config) = &schema.topic {
            let mut topic_parts = vec![topic_config.base.clone()];

            for key in &topic_config.key_order {
                let value = canonicalized_params
                    .get(key)
                    .context(format!("Missing key '{}' for topic building", key))?;
                topic_parts.push(value.clone());
            }

            Ok(encode_subject(&topic_parts))
        } else {
            Ok(Self::build_generic_topic(event_type, canonicalized_params))
        }
    }

    /// Build a generic topic subject for event types without schema.
    pub fn build_generic_topic(
        event_type: &str,
        canonicalized_params: &HashMap<String, String>,
    ) -> String {
        if canonicalized_params.is_empty() {
            return event_type.to_string();
        }

        // Stable key ordering keeps subject generation deterministic.
        let mut sorted_keys: Vec<_> = canonicalized_params.keys().collect();
        sorted_keys.sort();

        let mut topic_parts = vec![event_type.to_string()];

        for key in sorted_keys {
            if let Some(value) = canonicalized_params.get(key) {
                topic_parts.push(value.clone());
            }
        }

        encode_subject(&topic_parts)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::configuration::TopicConfig;

    #[test]
    fn test_generic_topic_building() {
        let mut params = HashMap::new();
        params.insert("class".to_string(), "od".to_string());
        params.insert("stream".to_string(), "enfo".to_string());

        let topic = TopicBuilder::build_generic_topic("mars", &params);
        // Should be sorted by key name: class, stream
        assert_eq!(topic, "mars.od.enfo");
    }

    #[test]
    fn test_empty_params_generic_topic() {
        let params = HashMap::new();
        let topic = TopicBuilder::build_generic_topic("mars", &params);
        assert_eq!(topic, "mars");
    }

    #[test]
    fn test_schema_topic_building() {
        let mut params = HashMap::new();
        params.insert("destination".to_string(), "FOO".to_string());
        params.insert("target".to_string(), "E1".to_string());
        params.insert("class".to_string(), "od".to_string());

        let topic_config = TopicConfig {
            base: "diss".to_string(),
            key_order: vec![
                "destination".to_string(),
                "target".to_string(),
                "class".to_string(),
            ],
        };

        let schema = EventSchema {
            payload: None,
            topic: Some(topic_config),
            endpoint: None,
            identifier: HashMap::new(),
            storage_policy: None,
            auth: None,
        };

        let topic =
            TopicBuilder::build_topic_with_schema("dissemination", &schema, &params).unwrap();
        assert_eq!(topic, "diss.FOO.E1.od");
    }

    #[test]
    fn test_topic_building_with_missing_schema_keys() {
        let mut params = HashMap::new();
        params.insert("destination".to_string(), "FOO".to_string());
        // Missing "target" which is required in key_order

        let topic_config = TopicConfig {
            base: "diss".to_string(),
            key_order: vec!["destination".to_string(), "target".to_string()],
        };

        let schema = EventSchema {
            payload: None,
            topic: Some(topic_config),
            endpoint: None,
            identifier: HashMap::new(),
            storage_policy: None,
            auth: None,
        };

        let result = TopicBuilder::build_topic_with_schema("dissemination", &schema, &params);
        assert!(result.is_err());
        assert!(
            result
                .unwrap_err()
                .to_string()
                .contains("Missing key 'target'")
        );
    }

    #[test]
    fn test_topic_building_with_special_characters() {
        let mut params = HashMap::new();
        params.insert("field1".to_string(), "value-with-dash".to_string());
        params.insert("field2".to_string(), "value_with_underscore".to_string());
        params.insert("field3".to_string(), "value.with.dots".to_string());

        let topic = TopicBuilder::build_generic_topic("test", &params);
        // Non-reserved characters are preserved; reserved ones are encoded.
        assert!(topic.contains("value-with-dash"));
        assert!(topic.contains("value_with_underscore"));
        assert!(topic.contains("value%2Ewith%2Edots"));
    }

    #[test]
    fn test_topic_consistency_across_operations() {
        let mut params = HashMap::new();
        params.insert("class".to_string(), "od".to_string());
        params.insert("destination".to_string(), "SCL".to_string());

        // Build topic multiple times with same parameters
        let topic1 = TopicBuilder::build_generic_topic("dissemination", &params);
        let topic2 = TopicBuilder::build_generic_topic("dissemination", &params);
        let topic3 = TopicBuilder::build_generic_topic("dissemination", &params);

        assert_eq!(topic1, topic2);
        assert_eq!(topic2, topic3);
    }

    #[test]
    fn test_schema_topic_encodes_reserved_tokens() {
        let mut params = HashMap::new();
        params.insert("a".to_string(), "1.2".to_string());
        params.insert("b".to_string(), "3*4".to_string());

        let topic_config = TopicConfig {
            base: "test".to_string(),
            key_order: vec!["a".to_string(), "b".to_string()],
        };

        let schema = EventSchema {
            payload: None,
            topic: Some(topic_config),
            endpoint: None,
            identifier: HashMap::new(),
            storage_policy: None,
            auth: None,
        };

        let topic = TopicBuilder::build_topic_with_schema("test", &schema, &params).unwrap();
        assert_eq!(topic, "test.1%2E2.3%2A4");
    }

    #[test]
    fn test_single_parameter_topic() {
        let mut params = HashMap::new();
        params.insert("only_param".to_string(), "only_value".to_string());

        let topic = TopicBuilder::build_generic_topic("single", &params);
        assert_eq!(topic, "single.only_value");
    }

    #[test]
    fn test_large_number_of_parameters() {
        let mut params = HashMap::new();
        for i in 0..100 {
            params.insert(format!("param{:03}", i), format!("value{}", i));
        }

        let topic = TopicBuilder::build_generic_topic("large", &params);

        // Should start with event type
        assert!(topic.starts_with("large."));

        // Should contain all parameters (sorted by key)
        assert!(topic.contains("value0"));
        assert!(topic.contains("value99"));

        // Should be deterministic
        let topic2 = TopicBuilder::build_generic_topic("large", &params);
        assert_eq!(topic, topic2);
    }
}