agntcy-slim-controller 0.7.0

Controller service and control API to configure the SLIM data plane through the control plane.
Documentation
// Copyright AGNTCY Contributors (https://github.com/agntcy)
// SPDX-License-Identifier: Apache-2.0

use std::sync::Arc;

use serde::Deserialize;

use slim_config::client::ClientConfig;
use slim_config::component::configuration::Configuration;
use slim_config::component::id::ID;
use slim_config::server::ServerConfig;
use slim_datapath::message_processing::MessageProcessor;

use crate::errors::ControllerError;
use crate::service::{ControlPlane, ControlPlaneSettings, from_server_config};

/// Configuration for the Control-Plane / Data-Plane component
#[derive(Debug, Clone, Deserialize, Default, PartialEq)]
#[serde(deny_unknown_fields)]
pub struct Config {
    /// Controller GRPC server settings
    #[serde(default)]
    pub servers: Vec<ServerConfig>,

    /// Controller client config to connect to control plane
    #[serde(default)]
    pub clients: Vec<ClientConfig>,

    /// How long to keep routing state after a server-side connection drops,
    /// waiting for the peer to reconnect before notifying the control plane.
    /// Accepts duration strings like "30s", "1s", "500ms".  Defaults to 30 s.
    #[serde(default)]
    pub recovery_ttl: Option<duration_string::DurationString>,
}

impl Config {
    /// Create a new Config instance with default values
    pub fn new() -> Self {
        Self::default()
    }

    pub fn is_default(&self) -> bool {
        self == &Self::default()
    }

    /// Create a new Config instance with the given servers
    pub fn with_servers(self, servers: Vec<ServerConfig>) -> Self {
        Self { servers, ..self }
    }

    /// Create a new Config instance with the given clients
    pub fn with_clients(self, clients: Vec<ClientConfig>) -> Self {
        Self { clients, ..self }
    }

    /// Get the list of server configurations
    pub fn servers(&self) -> &[ServerConfig] {
        &self.servers
    }

    /// Get the list of client configurations
    pub fn clients(&self) -> &[ClientConfig] {
        &self.clients
    }

    /// Create a ControlPlane service instance from this configuration
    pub fn into_service(
        &self,
        id: ID,
        group_name: Option<String>,
        message_processor: Arc<MessageProcessor>,
        // List of server configurations for the dataplane services.
        // Used to extract connection type information required to connect to the node
        // (e.g., TLS settings). This information is used by the control plane.
        dataplane_servers: &[ServerConfig],
    ) -> ControlPlane {
        let connection_details = dataplane_servers.iter().map(from_server_config).collect();

        ControlPlane::new(ControlPlaneSettings {
            id,
            group_name,
            servers: self.servers.clone(),
            clients: self.clients.clone(),
            message_processor,
            connection_details,
        })
    }
}

impl Configuration for Config {
    type Error = ControllerError;

    fn validate(&self) -> Result<(), Self::Error> {
        // Validate client and server configurations
        for server in self.servers.iter() {
            server.validate()?;
        }

        for client in &self.clients {
            client.validate()?;
        }

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use slim_config::component::id::{ID, Kind};
    use slim_config::server::ServerConfig;
    use slim_datapath::message_processing::MessageProcessor;
    use std::sync::Arc;

    fn create_test_server_config() -> ServerConfig {
        ServerConfig::with_endpoint("127.0.0.1:50051")
            .with_tls_settings(slim_config::tls::server::TlsServerConfig::insecure())
    }

    fn create_test_client_config() -> ClientConfig {
        ClientConfig::with_endpoint("http://127.0.0.1:50051")
            .with_tls_setting(slim_config::tls::client::TlsClientConfig::insecure())
    }

    #[test]
    fn test_config_new() {
        let config = Config::new();
        assert!(config.servers.is_empty());
        assert!(config.clients.is_empty());
    }

    #[test]
    fn test_config_default() {
        let config = Config::default();
        assert!(config.servers.is_empty());
        assert!(config.clients.is_empty());
    }

    #[test]
    fn test_config_with_servers() {
        let server_config = create_test_server_config();
        let config = Config::new().with_servers(vec![server_config.clone()]);

        assert_eq!(config.servers.len(), 1);
        assert_eq!(config.servers[0], server_config);
        assert!(config.clients.is_empty());
    }

    #[test]
    fn test_config_with_clients() {
        let client_config = create_test_client_config();
        let config = Config::new().with_clients(vec![client_config.clone()]);

        assert_eq!(config.clients.len(), 1);
        assert_eq!(config.clients[0], client_config);
        assert!(config.servers.is_empty());
    }

    #[test]
    fn test_config_servers_getter() {
        let server_config = create_test_server_config();
        let config = Config::new().with_servers(vec![server_config.clone()]);

        let servers = config.servers();
        assert_eq!(servers.len(), 1);
        assert_eq!(servers[0], server_config);
    }

    #[test]
    fn test_config_clients_getter() {
        let client_config = create_test_client_config();
        let config = Config::new().with_clients(vec![client_config.clone()]);

        let clients = config.clients();
        assert_eq!(clients.len(), 1);
        assert_eq!(clients[0], client_config);
    }

    #[test]
    fn test_config_chaining() {
        let server_config = create_test_server_config();
        let client_config = create_test_client_config();

        let config = Config::new()
            .with_servers(vec![server_config.clone()])
            .with_clients(vec![client_config.clone()]);

        assert_eq!(config.servers.len(), 1);
        assert_eq!(config.clients.len(), 1);
    }

    #[test]
    fn test_config_validate_empty() {
        let config = Config::new();
        assert!(config.validate().is_ok());
    }

    #[test]
    fn test_config_validate_with_valid_servers_and_clients() {
        let server_config = create_test_server_config();
        let client_config = create_test_client_config();
        let config = Config::new()
            .with_servers(vec![server_config])
            .with_clients(vec![client_config]);

        assert!(config.validate().is_ok());
    }

    #[test]
    fn test_config_clone() {
        let server_config = create_test_server_config();
        let client_config = create_test_client_config();

        let config1 = Config::new()
            .with_servers(vec![server_config])
            .with_clients(vec![client_config]);

        let config2 = config1.clone();

        assert_eq!(config1.servers, config2.servers);
        assert_eq!(config1.clients, config2.clients);
    }

    #[tokio::test]
    async fn test_config_into_service() {
        let server_config = create_test_server_config();
        let client_config = create_test_client_config();

        let config = Config::new()
            .with_servers(vec![server_config.clone()])
            .with_clients(vec![client_config]);

        let id = ID::new_with_name(Kind::new("slim").unwrap(), "test-instance").unwrap();
        let group_name = Some("test-group".to_string());
        let message_processor = Arc::new(MessageProcessor::new());

        let _control_plane =
            config.into_service(id, group_name, message_processor, &[server_config]);
    }

    #[test]
    fn test_config_debug_trait() {
        let config = Config::new();
        let debug_str = format!("{:?}", config);
        assert!(debug_str.contains("Config"));
        assert!(debug_str.contains("servers"));
        assert!(debug_str.contains("clients"));
    }

    #[test]
    fn test_config_validate_with_multiple_servers() {
        let server1 = create_test_server_config();
        let server2 = ServerConfig::with_endpoint("127.0.0.1:50052")
            .with_tls_settings(slim_config::tls::server::TlsServerConfig::insecure());

        let config = Config::new().with_servers(vec![server1, server2]);
        assert!(config.validate().is_ok());
    }

    #[test]
    fn test_config_validate_with_multiple_clients() {
        let client1 = create_test_client_config();
        let client2 = ClientConfig::with_endpoint("http://127.0.0.1:50052")
            .with_tls_setting(slim_config::tls::client::TlsClientConfig::insecure());

        let config = Config::new().with_clients(vec![client1, client2]);
        assert!(config.validate().is_ok());
    }

    #[test]
    fn test_config_partial_eq() {
        let config1 = Config::new();
        let config2 = Config::new();

        assert_eq!(config1, config2);

        let server_config = create_test_server_config();
        let config3 = config1.clone().with_servers(vec![server_config]);

        assert_ne!(config1, config3);
    }

    #[test]
    fn test_config_builder_pattern_reuse() {
        let base_config = Config::new();

        let config1 = base_config
            .clone()
            .with_servers(vec![create_test_server_config()]);
        let config2 = base_config
            .clone()
            .with_clients(vec![create_test_client_config()]);

        assert!(base_config.servers.is_empty());
        assert!(base_config.clients.is_empty());

        assert_eq!(config1.servers.len(), 1);
        assert!(config1.clients.is_empty());

        assert!(config2.servers.is_empty());
        assert_eq!(config2.clients.len(), 1);
    }

    #[test]
    fn test_config_overwrite_behavior() {
        let server1 = create_test_server_config();
        let server2 = ServerConfig::with_endpoint("127.0.0.1:50052")
            .with_tls_settings(slim_config::tls::server::TlsServerConfig::insecure());

        let config = Config::new()
            .with_servers(vec![server1])
            .with_servers(vec![server2.clone()]);

        assert_eq!(config.servers.len(), 1);
        assert_eq!(config.servers[0], server2);
    }
}