Documentation
// Copyright (c) 2025, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

//! Flex Gateway service configurations
//!
//! This module provides predefined types and configurations for Flex Gateway services
//! in integration tests. It supports both static and dynamic configurations,
//! API management and policy testing scenarios.
//!

use std::collections::HashMap;
use std::time::Duration;
use versions::Versioning;

use crate::config::{Config, ContainerConfig};
use crate::container::Container;
use crate::error::TestError;
use crate::image::Image;
use crate::port::{Port, PortAccess};
use crate::probe::{MessageProbe, MessageSource};
use crate::service::Service;
use crate::services::flex::dynamic::DynamicFlexConfig;

const FLEX_LOCAL_BASE: &str = "/usr/local/share";
const FLEX_CONFIG_BASE: &str = "mulesoft/flex-gateway/conf.d";
const FLEX_SCHEMA: &str = "http";

/// Default Flex Docker image name.
pub const FLEX_IMAGE_NAME: &str = "mulesoft/flex-gateway";

mod api;
mod dynamic;
mod gcl;
mod policy;

#[cfg(feature = "experimental")]
mod service;

pub use api::{ApiConfig, ApiConfigBuilder};
pub use policy::{PolicyConfig, PolicyConfigBuilder};

#[cfg(feature = "experimental")]
pub use service::{UpstreamServiceConfig, UpstreamServiceConfigBuilder};

/// Configuration for a local Flex service.
#[derive(Debug, Clone)]
pub struct FlexConfig {
    hostname: String,
    image_name: String,
    version: String,
    config_mounts: Vec<(String, String)>,
    ports: Vec<Port>,
    timeout: Duration,
    dynamic_config: DynamicFlexConfig,
}

impl FlexConfig {
    /// Returns the Flex hostname.
    pub fn hostname(&self) -> &str {
        &self.hostname
    }

    /// Returns the Flex Docker version.
    pub fn version(&self) -> &str {
        &self.version
    }

    /// Returns the Flex Docker image name.
    pub fn image_name(&self) -> &str {
        &self.image_name
    }

    /// Returns the map of Flex configuration mount points.
    pub fn config_mounts(&self) -> &[(String, String)] {
        &self.config_mounts
    }

    /// Returns the list of Flex listening ports.
    pub fn ports(&self) -> &[Port] {
        &self.ports
    }

    /// Returns the Flex readiness timeout.
    pub fn timeout(&self) -> Duration {
        self.timeout
    }

    /// Creates a new Flex configuration with default settings.
    pub fn new() -> Self {
        Self {
            hostname: "local-flex".to_string(),
            version: std::env::var("PDK_TEST_FLEX_IMAGE_VERSION")
                .unwrap_or_else(|_| "latest".to_string()),
            image_name: std::env::var("PDK_TEST_FLEX_IMAGE_NAME")
                .unwrap_or_else(|_| FLEX_IMAGE_NAME.to_string()),
            config_mounts: vec![],
            ports: vec![],
            timeout: Duration::from_secs(60),
            dynamic_config: DynamicFlexConfig::new(),
        }
    }

    /// Creates a builder for initialize a [`FlexConfig`].
    pub fn builder() -> FlexConfigBuilder {
        FlexConfigBuilder::new()
    }
}

impl Default for FlexConfig {
    fn default() -> Self {
        Self::new()
    }
}

/// A builder for initilizing a [`FlexConfig`].
#[derive(Debug)]
pub struct FlexConfigBuilder {
    config: FlexConfig,
}

impl FlexConfigBuilder {
    fn new() -> Self {
        Self {
            config: FlexConfig::new(),
        }
    }

    /// Sets the Flex hostname.
    /// By default set to "local-flex".
    pub fn hostname<T: Into<String>>(self, hostname: T) -> Self {
        Self {
            config: FlexConfig {
                hostname: hostname.into(),
                ..self.config
            },
        }
    }

    /// Sets the Flex Docker version.
    /// By default set to "latest".
    pub fn version<T: Into<String>>(self, version: T) -> Self {
        Self {
            config: FlexConfig {
                version: version.into(),
                ..self.config
            },
        }
    }

    /// Sets the Flex Docker image name.
    /// By default set to [`FLEX_IMAGE_NAME`].
    pub fn image_name<T: Into<String>>(self, image_name: T) -> Self {
        Self {
            config: FlexConfig {
                image_name: image_name.into(),
                ..self.config
            },
        }
    }

    /// Sets the Flex readiness timeout.
    pub fn timeout(self, timeout: Duration) -> Self {
        Self {
            config: FlexConfig {
                timeout,
                ..self.config
            },
        }
    }

    /// Sets the map of Flex configuration mount points.
    pub fn config_mounts<T, S, D>(self, config_mounts: T) -> Self
    where
        T: IntoIterator<Item = (S, D)>,
        S: Into<String>,
        D: Into<String>,
    {
        Self {
            config: FlexConfig {
                config_mounts: config_mounts
                    .into_iter()
                    .map(|(s, d)| (s.into(), d.into()))
                    .collect(),
                ..self.config
            },
        }
    }

    /// Sets the Flex listening ports.
    pub fn ports<T>(self, ports: T) -> Self
    where
        T: IntoIterator<Item = Port>,
    {
        Self {
            config: FlexConfig {
                ports: ports.into_iter().collect(),
                ..self.config
            },
        }
    }

    pub fn with_api(self, api: ApiConfig) -> Self {
        let mut ports = self.config.ports;
        ports.push(api.port);
        Self {
            config: FlexConfig {
                ports,
                dynamic_config: self.config.dynamic_config.with_api(api),
                ..self.config
            },
        }
    }

    #[cfg(feature = "experimental")]
    pub fn with_upstream_service(self, service: UpstreamServiceConfig) -> Self {
        Self {
            config: FlexConfig {
                dynamic_config: self.config.dynamic_config.with_upstream_service(service),
                ..self.config
            },
        }
    }

    /// Builds a new [`FlexConfig`].
    pub fn build(self) -> FlexConfig {
        self.config
    }
}

fn readiness(version: &str, timeout: Duration) -> Result<MessageProbe, TestError> {
    let versioning = Versioning::new(version)
        .ok_or_else(|| TestError::Startup(format!("Unable to parse Flex version `{version}`.")))?;
    let times = if version == "latest" || versioning >= Versioning::new("1.7.0").unwrap() {
        1
    } else if versioning >= Versioning::new("1.4.0").unwrap() {
        2
    } else {
        1
    };

    Ok(MessageProbe::builder("cds: added/updated")
        .times(times)
        .timeout(timeout)
        .source(MessageSource::StdOut)
        .build())
}

impl Config for FlexConfig {
    fn hostname(&self) -> &str {
        &self.hostname
    }

    fn port(&self) -> Port {
        self.ports.first().cloned().unwrap_or_default()
    }

    fn schema(&self) -> &str {
        FLEX_SCHEMA
    }

    fn to_container_config(&self) -> Result<ContainerConfig, TestError> {
        let config_mounts = self.config_mounts.iter();
        let dynamic = self.dynamic_config.dirs()?;

        let mounts = config_mounts.chain(dynamic.iter()).map(|(host, flex)| {
            (
                host.clone(),
                FLEX_LOCAL_BASE.to_string(),
                format!("{FLEX_CONFIG_BASE}/{flex}"),
            )
        });
        let ports = self.ports.iter().map(|&m| PortAccess::published(m));

        Ok(ContainerConfig::builder(
            self.hostname.clone(),
            Image::from_repository(&self.image_name).with_version(&self.version),
        )
        .ports(ports)
        .mounts(mounts)
        .readiness(readiness(&self.version, self.timeout)?)
        .build())
    }
}

/// Represents a Flex service instance.
#[derive(Default, Clone)]
pub struct Flex {
    sockets: HashMap<Port, String>,
}

impl Flex {
    /// Returns the external URL for a Flex listening `port`.
    /// If the port was not configured, returns [`None`].
    pub fn external_url(&self, port: Port) -> Option<String> {
        self.sockets
            .get(&port)
            .map(|socket| format!("{FLEX_SCHEMA}://{socket}"))
    }
}

impl Service for Flex {
    type Config = FlexConfig;

    fn new(_config: &Self::Config, container: &Container) -> Self {
        Self {
            sockets: container.sockets().clone(),
        }
    }
}

#[cfg(test)]
mod tests {
    use std::time::Duration;

    use crate::error::TestError;

    use super::readiness;

    #[test]
    fn readiness_for_1_3_0() -> Result<(), TestError> {
        let readiness = readiness("1.3.0", Duration::from_secs(1))?;
        assert_eq!(readiness.times(), 1);

        Ok(())
    }

    #[test]
    fn readiness_for_1_6_0() -> Result<(), TestError> {
        let readiness = readiness("1.6.0", Duration::from_secs(1))?;
        assert_eq!(readiness.times(), 2);

        Ok(())
    }

    #[test]
    fn readiness_for_1_7_0() -> Result<(), TestError> {
        let readiness = readiness("1.7.0", Duration::from_secs(1))?;
        assert_eq!(readiness.times(), 1);

        Ok(())
    }

    #[test]
    fn readiness_for_latest() -> Result<(), TestError> {
        let readiness = readiness("latest", Duration::from_secs(1))?;
        assert_eq!(readiness.times(), 1);

        Ok(())
    }
}