Documentation
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

use crate::cluster::router::{
    manager::{condition_manager::ConditionRouterManager, tag_manager::TagRouterManager},
    nacos_config_center::nacos_client::NacosClient,
    router_chain::RouterChain,
};
use crate::Url;
use dubbo_config::{
    get_global_config,
    router::{ConditionRouterConfig, NacosConfig, TagRouterConfig},
};
use crate::logger::tracing::{info, trace};
use once_cell::sync::OnceCell;
use std::{
    collections::HashMap,
    sync::{Arc, RwLock},
};

pub static GLOBAL_ROUTER_MANAGER: OnceCell<Arc<RwLock<RouterManager>>> = OnceCell::new();
const TAG: &str = "tag";
const CONDITION: &str = "condition";
pub struct RouterManager {
    pub condition_router_manager: ConditionRouterManager,
    pub tag_router_manager: TagRouterManager,
    pub nacos: Option<NacosClient>,
    pub consumer: HashMap<String, Url>,
}

impl RouterManager {
    pub fn get_router_chain(&self, service: String) -> RouterChain {
        let mut chain = RouterChain::new();
        if let Some(url) = self.consumer.get(service.as_str()) {
            if let Some(tag_router) = self.tag_router_manager.get_router(&service) {
                chain.add_router(TAG.to_string(), Box::new(tag_router));
            }
            if let Some(condition_router) = self.condition_router_manager.get_router(&service) {
                chain.add_router(CONDITION.to_string(), Box::new(condition_router));
            }
            chain.self_url = url.clone();
        }
        chain
    }

    pub fn notify(&mut self, event: RouterConfigChangeEvent) {
        match event.router_kind.as_str() {
            CONDITION => {
                let config: ConditionRouterConfig =
                    serde_yaml::from_str(event.content.as_str()).unwrap();
                self.condition_router_manager.update(config)
            }
            TAG => {
                let config: TagRouterConfig = serde_yaml::from_str(event.content.as_str()).unwrap();
                self.tag_router_manager.update(config)
            }
            _ => {
                info!("other router change event")
            }
        }
    }

    pub fn init_nacos(&mut self, config: NacosConfig) {
        self.nacos = Some(NacosClient::new_init_client(config));
        self.init_router_managers_for_nacos();
    }

    fn init_router_managers_for_nacos(&mut self) {
        if let Some(tag_config) = self
            .nacos
            .as_ref()
            .and_then(|n| n.get_config("application", TAG, TAG))
        {
            self.tag_router_manager.update(tag_config);
        }

        if let Some(condition_app_config) = self
            .nacos
            .as_ref()
            .and_then(|n| n.get_config("application", CONDITION, TAG))
        {
            self.condition_router_manager.update(condition_app_config);
        }

        for (service_name, _) in &self.consumer {
            if let Some(condition_config) = self
                .nacos
                .as_ref()
                .and_then(|n| n.get_config(service_name, CONDITION, CONDITION))
            {
                self.condition_router_manager.update(condition_config);
            }
        }
    }

    pub fn init(&mut self) {
        let config = get_global_config().routers.clone();
        self.init_consumer_configs();
        if let Some(nacos_config) = &config.nacos {
            self.init_nacos(nacos_config.clone());
        } else {
            trace!("Nacos not configured, using local YAML configuration for routing");
            if let Some(condition_configs) = &config.conditions {
                for condition_config in condition_configs {
                    self.condition_router_manager
                        .update(condition_config.clone());
                }
            } else {
                info!("Unconfigured Condition Router")
            }
            if let Some(tag_config) = &config.tags {
                self.tag_router_manager.update(tag_config.clone());
            } else {
                info!("Unconfigured Tag Router")
            }
        }
    }

    fn init_consumer_configs(&mut self) {
        let consumer_configs = get_global_config()
            .routers
            .consumer
            .clone()
            .unwrap_or_else(Vec::new);

        for consumer_config in consumer_configs {
            let service_url = Url::from_url(
                format!("{}/{}", consumer_config.url, consumer_config.service).as_str(),
            )
            .expect("Consumer config error");

            self.consumer.insert(consumer_config.service, service_url);
        }
    }
}

pub fn get_global_router_manager() -> &'static Arc<RwLock<RouterManager>> {
    GLOBAL_ROUTER_MANAGER.get_or_init(|| {
        let mut router_manager = RouterManager {
            condition_router_manager: ConditionRouterManager::default(),
            tag_router_manager: TagRouterManager::default(),
            nacos: None,
            consumer: HashMap::new(),
        };
        router_manager.init();
        return Arc::new(RwLock::new(router_manager));
    })
}

#[derive(Debug, Default, Clone)]
pub struct RouterConfigChangeEvent {
    pub service_name: String,
    pub router_kind: String,
    pub content: String,
}