use std::future::Future;
use std::{
any::{type_name, Any, TypeId},
fmt::Debug,
};
use crate::tina::config::{log::LogConfig, ConfigExt, UpdateableConfig};
use crate::tina::config::{security::SecurityConfig, server::ServerConfig};
use crate::tina::core::service::cache::ICacheService;
use crate::tina::core::service::date_time::IDateTimeService;
use crate::tina::core::service::dict::IDictService;
use crate::tina::core::service::file::IFileService;
use crate::tina::core::service::log_record::ILogRecordService;
use crate::tina::core::service::operation_log::IOperationLogService;
use crate::tina::core::service::permission::IPermissionService;
use crate::tina::core::service::throttle::IThrottleService;
use crate::tina::core::service::user::IUserService;
use crate::tina::data::AppResult;
use crate::tina::security::password_encoder::PasswordEncoder;
use crate::tina::util::Utility;
use crate::tina::{client::registry::RegistryClient, config::registry::RegistryConfig};
use crate::{app_error_from, app_system_error};
use config::builder::DefaultState;
use config::{Config, ConfigBuilder};
use dashmap::DashMap;
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use reqwest::Client;
use reqwest_middleware::ClientWithMiddleware;
use serde::{Deserialize, Serialize, Serializer};
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone)]
pub struct Application(Arc<AppConfig>);
impl From<AppConfig> for Application {
fn from(config: AppConfig) -> Self {
Application(Arc::new(config))
}
}
impl From<Arc<AppConfig>> for Application {
fn from(config: Arc<AppConfig>) -> Self {
Application(config)
}
}
impl Debug for Application {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("Application").finish()
}
}
impl Deref for Application {
type Target = AppConfig;
fn deref(&self) -> &Self::Target {
self.0.deref()
}
}
impl AsRef<Application> for Application {
fn as_ref(&self) -> &Application {
self
}
}
pub type AppConfigExtention = Arc<dyn Any + Send + Sync + 'static>;
#[derive(Debug)]
pub struct AppConfigExtentionMessage {
pub event: AppConfigEvent,
pub extension: Option<(TypeId, AppConfigExtention)>,
}
#[derive(Debug, Clone, Copy)]
pub enum AppConfigEvent {
Terminate,
ExtensionChange,
}
pub type AppConfigRegistryConfigChangeFunc = Arc<dyn Fn(Config) -> BoxFuture<'static, AppResult<()>> + Send + Sync + 'static>;
#[allow(unused)]
#[derive(Serialize, Deserialize)]
pub struct AppConfig {
pub address: String,
pub port: Option<usize>,
pub workers: usize,
pub application_name: String,
pub version: String,
pub copyright_year: String,
pub shutdown_after_start: Option<bool>,
#[serde(skip)]
extensions: Arc<DashMap<TypeId, AppConfigExtention>>,
#[serde(skip)]
pub http_client: Option<ClientWithMiddleware>,
#[serde(skip)]
pub(crate) registry_config_change_fn: Option<AppConfigRegistryConfigChangeFunc>,
}
impl Debug for AppConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AppConfig")
.field("address", &self.address)
.field("port", &self.port)
.field("workers", &self.workers)
.field("application_name", &self.application_name)
.field("version", &self.version)
.field("copyright_year", &self.copyright_year)
.field("extensions", &self.extensions)
.finish()
}
}
impl AppConfig {
pub fn address(mut self, address: &str) -> Self {
self.address = address.to_owned();
self
}
pub fn port(mut self, port: usize) -> Self {
self.port = Some(port);
self
}
pub fn workers(mut self, workers: usize) -> Self {
self.workers = workers;
self
}
pub fn application_name(mut self, application_name: &str) -> Self {
self.application_name = application_name.to_owned();
self
}
pub fn version(mut self, version: &str) -> Self {
self.version = version.to_owned();
self
}
pub fn copyright_year(mut self, copyright_year: &str) -> Self {
self.copyright_year = copyright_year.to_owned();
self
}
pub fn http_client(mut self, http_client: ClientWithMiddleware) -> Self {
self.http_client = Some(http_client);
self
}
}
impl Default for AppConfig {
fn default() -> Self {
Self::new()
}
}
impl AppConfig {
pub fn new() -> AppConfig {
AppConfig {
address: "0.0.0.0".to_string(),
port: None,
workers: 50,
application_name: "".to_owned(),
version: "1.0.0".to_string(),
copyright_year: "2023".to_string(),
extensions: Default::default(),
http_client: Some(ClientWithMiddleware::from(Client::default())),
registry_config_change_fn: None,
shutdown_after_start: None,
}
}
pub fn from_yaml_content(content: impl AsRef<str>) -> AppResult<(Self, Config)> {
let config = ConfigBuilder::<DefaultState>::default()
.add_source(config::File::from_str(content.as_ref(), config::FileFormat::Yaml))
.build()
.map_err(app_error_from!())?;
Self::from_config(config)
}
pub fn from_config(config: Config) -> AppResult<(Self, Config)> {
let app = config.clone().try_deserialize::<Self>().map_err(app_error_from!())?;
Ok((app, config))
}
pub fn extension<T: Send + Sync + 'static>(&self) -> AppResult<Arc<T>> {
let type_id = TypeId::of::<T>();
match self.extensions.get(&type_id) {
None => Err(crate::app_system_error!("no type found in extension: {}", type_name::<T>())),
Some(ext) => ext
.clone()
.downcast::<T>()
.map_err(|err| crate::app_system_error!("cast extension type failed: {}, reason: {:?}", type_name::<T>(), err)),
}
}
pub fn add_extension<T: Send + Sync + 'static>(&self, ext: T) {
let type_id = TypeId::of::<T>();
self.extensions.insert(type_id, Arc::new(ext));
}
pub fn is_terminated(&self) -> bool {
Utility::is_terminated()
}
pub fn terminate(&self) {
Utility::terminate()
}
pub fn get_http_client(&self) -> AppResult<&ClientWithMiddleware> {
self.http_client.as_ref().ok_or_else(|| app_system_error!("no http client found in application"))
}
async fn init_default_config(&mut self, config0: &Config) -> AppResult<()> {
let config = config0.get_or_default::<LogConfig>("log");
config.apply(self).await?;
self.set_config(config);
let config = config0.get_or_default::<RegistryConfig>("registry");
config.apply(self).await?;
self.set_config(config);
let config = config0.get_or_default::<SecurityConfig>("security");
config.apply(self).await?;
self.set_config(config);
let config = config0.get_or_default::<ServerConfig>("server");
config.apply(self).await?;
self.set_config(config);
Ok(())
}
pub(crate) async fn update_default_config(&self, config: &Config) -> AppResult<()> {
if let Ok(v) = config.get::<LogConfig>("log") {
tracing::info!("更新配置[{}]: {:?}", type_name::<LogConfig>(), v);
v.apply(self).await?;
self.set_config(v)
}
if let Ok(v) = config.get::<RegistryConfig>("registry") {
tracing::info!("更新配置[{}]: {:?}", type_name::<RegistryConfig>(), v);
v.apply(self).await?;
self.set_config(v)
}
if let Ok(v) = config.get::<SecurityConfig>("security") {
tracing::info!("更新配置[{}]: {:?}", type_name::<SecurityConfig>(), v);
v.apply(self).await?;
self.set_config(v)
}
if let Ok(v) = config.get::<ServerConfig>("server") {
tracing::info!("更新配置[{}]: {:?}", type_name::<ServerConfig>(), v);
v.apply(self).await?;
self.set_config(v)
}
Ok(())
}
pub async fn update_config<T: for<'de> Deserialize<'de> + UpdateableConfig + Debug + Send + Sync + 'static>(
&self,
config: &Config,
root_key: &str,
) -> AppResult<()> {
if let Ok(v) = config.get::<T>(root_key) {
tracing::info!("更新配置[{}]: {:?}", type_name::<ServerConfig>(), v);
v.apply(self).await?;
self.set_config(v)
}
Ok(())
}
pub async fn init(mut self, config: &Config) -> AppResult<Application> {
self.init_default_config(config).await?;
self.set_password_encoder(PasswordEncoder::try_new()?);
#[cfg(feature = "client-grpc")]
{
self.add_extension(crate::tina::client::GrpcClient::default());
}
let application = Application::from(self);
#[cfg(feature = "client-nacos")]
{
if let Err(err) = application.regist_client() {
tracing::error!("注册客户端失败...{err:?}");
return Err(err);
}
}
Ok(application)
}
pub fn with_config_change<F, R>(&mut self, f: F) -> &mut Self
where
F: Fn(Config) -> R + Send + Sync + 'static,
R: Future<Output = AppResult<()>> + Send + 'static,
{
let func = move |config: Config| f(config).boxed();
self.registry_config_change_fn = Some(Arc::new(func) as AppConfigRegistryConfigChangeFunc);
self
}
}
impl AppConfig {
pub fn set_password_encoder(&mut self, encoder: PasswordEncoder) {
self.add_extension(encoder)
}
pub fn get_password_encoder(&self) -> AppResult<Arc<PasswordEncoder>> {
self.extension::<PasswordEncoder>()
}
pub fn set_service<S: Send + Sync + ?Sized + 'static>(&mut self, service: Arc<S>) {
let type_id = TypeId::of::<Arc<S>>();
self.extensions.insert(type_id, Arc::new(service));
}
pub fn get_service<S: Send + Sync + ?Sized + 'static>(&self) -> AppResult<Arc<S>> {
let type_id = TypeId::of::<Arc<S>>();
match self.extensions.get(&type_id) {
None => Err(app_system_error!("no {} service init in application", type_name::<Arc<S>>())),
Some(service) => match service.downcast_ref::<Arc<S>>() {
None => Err(app_system_error!("no {} service init in application", type_name::<Arc<S>>())),
Some(service) => Ok(service.clone()),
},
}
}
pub fn set_permission_service(&mut self, service: impl IPermissionService) {
self.set_service(Arc::new(service) as Arc<dyn IPermissionService>)
}
pub fn get_permission_service(&self) -> AppResult<Arc<dyn IPermissionService>> {
self.get_service::<dyn IPermissionService>()
}
pub fn set_user_service(&mut self, service: impl IUserService) {
self.set_service(Arc::new(service) as Arc<dyn IUserService>)
}
pub fn get_user_service(&self) -> AppResult<Arc<dyn IUserService>> {
self.get_service::<dyn IUserService>()
}
pub fn set_operation_log_service(&mut self, service: impl IOperationLogService) {
self.set_service(Arc::new(service) as Arc<dyn IOperationLogService>)
}
pub fn get_operation_log_service(&self) -> AppResult<Arc<dyn IOperationLogService>> {
self.get_service::<dyn IOperationLogService>()
}
pub fn set_log_record_service(&mut self, service: impl ILogRecordService) {
self.set_service(Arc::new(service) as Arc<dyn ILogRecordService>)
}
pub fn get_log_record_service(&self) -> AppResult<Arc<dyn ILogRecordService>> {
self.get_service::<dyn ILogRecordService>()
}
pub fn set_dict_service(&mut self, service: impl IDictService) {
self.set_service(Arc::new(service) as Arc<dyn IDictService>)
}
pub fn get_dict_service(&self) -> AppResult<Arc<dyn IDictService>> {
self.get_service::<dyn IDictService>()
}
pub fn set_file_service(&mut self, service: impl IFileService) {
self.set_service(Arc::new(service) as Arc<dyn IFileService>)
}
pub fn get_file_service(&self) -> AppResult<Arc<dyn IFileService>> {
self.get_service::<dyn IFileService>()
}
pub fn set_date_time_service(&mut self, service: impl IDateTimeService) {
self.set_service(Arc::new(service) as Arc<dyn IDateTimeService>)
}
pub fn get_date_time_service(&self) -> AppResult<Arc<dyn IDateTimeService>> {
self.get_service::<dyn IDateTimeService>()
}
pub fn set_cache_service<T: ICacheService>(&mut self, service: T) {
self.set_service(Arc::new(service))
}
pub fn get_cache_service<T: ICacheService>(&self) -> AppResult<Arc<T>> {
self.get_service::<T>()
}
pub fn set_throttle_service(&mut self, service: impl IThrottleService) {
self.set_service(Arc::new(service) as Arc<dyn IThrottleService>)
}
pub fn get_throttle_service(&self) -> AppResult<Arc<dyn IThrottleService>> {
self.get_service::<dyn IThrottleService>()
}
}
impl AppConfig {
pub fn get_config<T: UpdateableConfig + Send + Sync + 'static>(&self) -> AppResult<Arc<T>> {
self.extension::<T>()
}
pub fn set_config<T: UpdateableConfig + Send + Sync + 'static>(&self, config: T) {
self.add_extension(config)
}
pub fn get_server_config(&self) -> AppResult<Arc<ServerConfig>> {
self.get_config::<ServerConfig>()
}
pub fn get_registry_config(&self) -> AppResult<Arc<RegistryConfig>> {
self.get_config::<RegistryConfig>()
}
pub fn get_security_config(&self) -> AppResult<Arc<SecurityConfig>> {
self.get_config::<SecurityConfig>()
}
pub fn get_log_config(&self) -> AppResult<Arc<LogConfig>> {
self.get_config::<LogConfig>()
}
}
impl AppConfig {
pub fn get_registry_client(&self) -> AppResult<Arc<RegistryClient>> {
self.extension::<RegistryClient>()
}
fn set_registry_client(&self, client: RegistryClient) {
self.add_extension(client)
}
}
#[cfg(feature = "client-grpc")]
impl AppConfig {
pub fn get_grpc_client(&self) -> AppResult<Arc<crate::tina::client::GrpcClient>> {
self.extension::<crate::tina::client::GrpcClient>()
}
}
#[allow(dead_code)]
fn box_fn<T, F, R>(f: F) -> impl Fn(Application) -> BoxFuture<'static, R>
where
T: Future<Output = R> + Send + 'static,
F: Fn(Application) -> T + Copy + Send + Sync + 'static,
R: Send + 'static,
{
move |application: Application| async move { f(application).await }.boxed()
}
#[allow(deprecated, dead_code)]
fn resolve_path(path: &mut String) -> AppResult<()> {
if path.is_empty() {
*path = Utility::get_current_exe_dir()?;
} else if let Some(s) = path.chars().next() {
match s {
'.' => {
let dir = Utility::get_current_exe_dir()?;
*path = path.replacen(s.to_string().as_str(), dir.as_str(), 1);
}
'~' => match std::env::home_dir() {
None => {
let dir = std::env::current_dir().map_err(app_error_from!())?;
*path = path.replacen(s.to_string().as_str(), dir.as_path().to_string_lossy().as_ref(), 1);
}
Some(dir) => {
*path = path.replacen(s.to_string().as_str(), dir.as_path().to_string_lossy().as_ref(), 1);
}
},
_ => {}
}
}
Ok(())
}
#[cfg(feature = "client-nacos")]
mod nacos {
use crate::{
app_system_error,
tina::{client::registry::IRegistryClient, data::AppResult, util::not_empty::INotEmpty},
};
use super::Application;
impl Application {
pub fn regist_client(&self) -> AppResult<()> {
let application = self.clone();
std::thread::Builder::new()
.name("注册服务线程".to_string())
.spawn(move || {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(1)
.enable_all()
.build()
.expect("build registry rt failed");
rt.block_on(async move {
let func = move || async move {
let registry_config = application.get_registry_config()?;
let server_addr = registry_config.address.join(",");
let app_name = registry_config.app_name.to_owned();
let mut client_builder = crate::tina::client::registry::RegistryClientBuilder::new(server_addr, app_name);
if let Some(v) = registry_config.username.as_ref() {
client_builder = client_builder.auth_username(v);
}
if let Some(v) = registry_config.password.as_ref() {
client_builder = client_builder.auth_password(v);
}
if let Some(v) = registry_config.namespace.as_ref() {
client_builder = client_builder.namespace(v);
}
let mut client = client_builder.build();
crate::tina::client::registry::IRegistryClient::init(&mut client, &application).await?;
let config_infos = registry_config.config.as_ref();
if let Some(config_infos) = config_infos {
for config_info in config_infos.iter() {
let data_id = config_info.data_id.to_owned();
let group = config_info.group.to_owned();
if data_id.not_empty() && group.not_empty() {
let config = client.get_raw_config(&application, data_id, group).await?;
if let Some(config) = config {
application.update_default_config(&config).await?;
if let Some(func) = application.registry_config_change_fn.as_ref() {
let func = func.clone();
func(config).await?;
}
}
}
}
}
application.set_registry_client(client);
tracing::info!("注册客户端成功: {:?}", application.get_registry_client()?);
Ok(()) as AppResult<()>
};
loop {
let func2 = func.clone();
match func2().await {
Ok(v) => break v,
Err(err) => {
tracing::error!("注册客户端失败: {err:?}");
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
continue;
}
}
}
})
})
.map_err(|err| app_system_error!("启动注册服务线程失败: {err:?}"))?
.join()
.map_err(|_| app_system_error!("启动注册服务线程失败"))?;
Ok(())
}
}
}
pub trait FromApplication {
type Target;
fn from_application(application: Application, init_components: bool) -> AppResult<Self::Target>;
}
impl Serialize for Application {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
self.0.serialize(serializer)
}
}
#[crate::async_trait]
pub trait ApplicationExt {
async fn sleep_current_task(&self, duration: Duration);
fn spawn<F: Future<Output = AppResult<R>> + Send + 'static, R: Send + 'static>(&self, fut: F) -> AppResult<()>;
}