use std::sync::Arc;
use std::time::Duration;
use parking_lot::RwLock;
use tokio::sync::Notify;
use tokio::task::JoinHandle;
use tracing::{debug, error, info, warn};
use crate::api::config::{
ConfigBatchListenRequest, ConfigChangeBatchListenResponse, ConfigInfo, ConfigListenContext,
ConfigPublishRequest, ConfigPublishResponse, ConfigQueryRequest, ConfigQueryResponse,
ConfigRemoveRequest, ConfigRemoveResponse, ConfigSearchRequest, ConfigSearchResponse,
ConfigSearchItem,
};
use crate::cache::FileCache;
use crate::common::{md5_hash, DEFAULT_GROUP};
use crate::config::{
CallbackListener, ConfigCache, ConfigChangeEvent, ConfigChangeType, ConfigListener,
ListenerRegistry,
};
use crate::error::{BatataError, Result};
use crate::remote::RpcClient;
use crate::CacheConfig;
pub struct ConfigService {
rpc_client: Arc<RpcClient>,
cache: Arc<ConfigCache>,
file_cache: Option<Arc<FileCache>>,
cache_config: CacheConfig,
listeners: Arc<ListenerRegistry>,
namespace: String,
started: Arc<RwLock<bool>>,
listen_task: Arc<RwLock<Option<JoinHandle<()>>>>,
shutdown: Arc<Notify>,
}
impl ConfigService {
pub fn new(rpc_client: Arc<RpcClient>, namespace: &str, cache_config: CacheConfig) -> Self {
let file_cache = cache_config
.cache_dir
.as_ref()
.and_then(|dir| FileCache::new(dir).ok())
.map(Arc::new);
Self {
rpc_client,
cache: Arc::new(ConfigCache::new()),
file_cache,
cache_config,
listeners: Arc::new(ListenerRegistry::new()),
namespace: namespace.to_string(),
started: Arc::new(RwLock::new(false)),
listen_task: Arc::new(RwLock::new(None)),
shutdown: Arc::new(Notify::new()),
}
}
pub async fn start(&self) -> Result<()> {
if *self.started.read() {
return Err(BatataError::ClientAlreadyStarted);
}
*self.started.write() = true;
let listeners = self.listeners.clone();
let cache = self.cache.clone();
let rpc_client = self.rpc_client.clone();
let shutdown = self.shutdown.clone();
let handle = tokio::spawn(async move {
Self::listen_loop(listeners, cache, rpc_client, shutdown).await;
});
*self.listen_task.write() = Some(handle);
info!("ConfigService started");
Ok(())
}
pub async fn stop(&self) {
*self.started.write() = false;
self.shutdown.notify_one();
if let Some(handle) = self.listen_task.write().take() {
handle.abort();
}
info!("ConfigService stopped");
}
pub async fn get_config(&self, data_id: &str, group: &str) -> Result<String> {
self.get_config_with_timeout(data_id, group, 3000).await
}
pub async fn get_config_with_timeout(
&self,
data_id: &str,
group: &str,
_timeout_ms: u64,
) -> Result<String> {
let group = if group.is_empty() { DEFAULT_GROUP } else { group };
if let Some(config) = self.cache.get(data_id, group, &self.namespace) {
return Ok(config.content);
}
let request = ConfigQueryRequest::new(data_id, group, &self.namespace);
match self.rpc_client.request::<_, ConfigQueryResponse>(&request).await {
Ok(response) => {
if response.response.error_code == ConfigQueryResponse::CONFIG_NOT_FOUND {
return Err(BatataError::ConfigNotFound {
data_id: data_id.to_string(),
group: group.to_string(),
namespace: self.namespace.clone(),
});
}
let mut config = ConfigInfo::new(data_id, group, &self.namespace);
config.content = response.content.clone();
config.md5 = response.md5.clone();
config.last_modified = response.last_modified;
config.content_type = response.content_type.clone();
self.cache.put(config.clone());
if let Some(file_cache) = &self.file_cache {
if let Err(e) = file_cache.save_config(&config) {
warn!("Failed to save config to file cache: {}", e);
}
}
Ok(response.content)
}
Err(e) => {
if self.cache_config.failover_enabled {
if let Some(file_cache) = &self.file_cache {
if let Some(config) = file_cache.load_config(data_id, group, &self.namespace)
{
warn!(
"Using cached config due to server error: {} (dataId={}, group={})",
e, data_id, group
);
if self.cache_config.update_cache_when_empty {
self.cache.put(config.clone());
}
return Ok(config.content);
}
}
}
Err(e)
}
}
}
pub async fn get_config_and_sign_listener<L>(
&self,
data_id: &str,
group: &str,
listener: L,
) -> Result<String>
where
L: ConfigListener + 'static,
{
let content = self.get_config(data_id, group).await?;
self.add_listener(data_id, group, listener);
let md5 = md5_hash(&content);
self.listeners
.set_md5(data_id, group, &self.namespace, &md5);
Ok(content)
}
pub async fn publish_config(&self, data_id: &str, group: &str, content: &str) -> Result<bool> {
self.publish_config_with_type(data_id, group, content, None)
.await
}
pub async fn publish_config_with_type(
&self,
data_id: &str,
group: &str,
content: &str,
config_type: Option<&str>,
) -> Result<bool> {
let group = if group.is_empty() { DEFAULT_GROUP } else { group };
let mut request = ConfigPublishRequest::new(data_id, group, &self.namespace, content);
if let Some(t) = config_type {
request = request.with_type(t);
}
let response: ConfigPublishResponse = self.rpc_client.request(&request).await?;
if response.response.success {
let mut config = ConfigInfo::new(data_id, group, &self.namespace);
config.update_content(content);
self.cache.put(config);
}
Ok(response.response.success)
}
pub async fn remove_config(&self, data_id: &str, group: &str) -> Result<bool> {
let group = if group.is_empty() { DEFAULT_GROUP } else { group };
let request = ConfigRemoveRequest::new(data_id, group, &self.namespace);
let response: ConfigRemoveResponse = self.rpc_client.request(&request).await?;
if response.response.success {
self.cache.remove(data_id, group, &self.namespace);
}
Ok(response.response.success)
}
pub fn add_listener<L>(&self, data_id: &str, group: &str, listener: L)
where
L: ConfigListener + 'static,
{
let group = if group.is_empty() { DEFAULT_GROUP } else { group };
self.listeners
.add_listener(data_id, group, &self.namespace, Arc::new(listener));
if let Some(config) = self.cache.get(data_id, group, &self.namespace) {
self.listeners
.set_md5(data_id, group, &self.namespace, &config.md5);
}
}
pub fn add_callback_listener<F>(&self, data_id: &str, group: &str, callback: F)
where
F: Fn(ConfigChangeEvent) + Send + Sync + 'static,
{
self.add_listener(data_id, group, CallbackListener::new(callback));
}
pub fn remove_listener(&self, data_id: &str, group: &str) {
let group = if group.is_empty() { DEFAULT_GROUP } else { group };
self.listeners.remove_listener(data_id, group, &self.namespace);
}
pub async fn search_config(
&self,
data_id_pattern: &str,
group_pattern: &str,
page_no: i32,
page_size: i32,
) -> Result<(i32, Vec<ConfigSearchItem>)> {
let request = ConfigSearchRequest::new(&self.namespace)
.with_data_id(data_id_pattern)
.with_group(group_pattern)
.with_page(page_no, page_size);
let response: ConfigSearchResponse = self.rpc_client.request(&request).await?;
Ok((response.total_count, response.page_items))
}
pub async fn search_config_blur(
&self,
data_id_pattern: &str,
group_pattern: &str,
) -> Result<Vec<ConfigSearchItem>> {
let (_, items) = self.search_config(data_id_pattern, group_pattern, 1, 1000).await?;
Ok(items)
}
pub async fn get_server_status(&self) -> Result<String> {
if self.rpc_client.is_connected() {
Ok("UP".to_string())
} else {
Ok("DOWN".to_string())
}
}
async fn listen_loop(
listeners: Arc<ListenerRegistry>,
cache: Arc<ConfigCache>,
rpc_client: Arc<RpcClient>,
shutdown: Arc<Notify>,
) {
let listen_interval = Duration::from_secs(30);
loop {
tokio::select! {
_ = shutdown.notified() => {
info!("Listen loop shutdown");
break;
}
_ = tokio::time::sleep(listen_interval) => {
if listeners.listener_count() == 0 {
continue;
}
let contexts = listeners.get_listen_contexts();
if contexts.is_empty() {
continue;
}
let mut request = ConfigBatchListenRequest::new(true);
for (data_id, group, tenant, md5) in contexts {
request = request.add_context(ConfigListenContext::new(
&data_id, &group, &tenant, &md5,
));
}
match rpc_client.request::<_, ConfigChangeBatchListenResponse>(&request).await {
Ok(response) => {
for changed in response.changed_configs {
debug!(
"Config changed: dataId={}, group={}, tenant={}",
changed.data_id, changed.group, changed.tenant
);
let query_request = ConfigQueryRequest::new(
&changed.data_id,
&changed.group,
&changed.tenant,
);
match rpc_client.request::<_, ConfigQueryResponse>(&query_request).await {
Ok(query_response) => {
let old_content = cache
.get(&changed.data_id, &changed.group, &changed.tenant)
.map(|c| c.content);
let event = ConfigChangeEvent::new(
&changed.data_id,
&changed.group,
&changed.tenant,
old_content,
query_response.content.clone(),
ConfigChangeType::Modify,
);
let mut config = ConfigInfo::new(
&changed.data_id,
&changed.group,
&changed.tenant,
);
config.content = query_response.content;
config.md5 = query_response.md5.clone();
config.last_modified = query_response.last_modified;
cache.put(config);
listeners.set_md5(
&changed.data_id,
&changed.group,
&changed.tenant,
&query_response.md5,
);
listeners.notify_change(event).await;
}
Err(e) => {
error!("Failed to fetch changed config: {}", e);
}
}
}
}
Err(e) => {
warn!("Config listen request failed: {}", e);
}
}
}
}
}
}
}
impl Drop for ConfigService {
fn drop(&mut self) {
self.shutdown.notify_one();
}
}