1use std::sync::Arc;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5use tokio::sync::Notify;
6use tokio::task::JoinHandle;
7use tracing::{debug, error, info, warn};
8
9use crate::api::config::{
10 ConfigBatchListenRequest, ConfigChangeBatchListenResponse, ConfigInfo, ConfigListenContext,
11 ConfigPublishRequest, ConfigPublishResponse, ConfigQueryRequest, ConfigQueryResponse,
12 ConfigRemoveRequest, ConfigRemoveResponse, ConfigSearchRequest, ConfigSearchResponse,
13 ConfigSearchItem,
14};
15use crate::cache::FileCache;
16use crate::common::{md5_hash, DEFAULT_GROUP};
17use crate::config::{
18 CallbackListener, ConfigCache, ConfigChangeEvent, ConfigChangeType, ConfigListener,
19 ListenerRegistry,
20};
21use crate::error::{BatataError, Result};
22use crate::remote::RpcClient;
23use crate::CacheConfig;
24
25pub struct ConfigService {
27 rpc_client: Arc<RpcClient>,
29
30 cache: Arc<ConfigCache>,
32
33 file_cache: Option<Arc<FileCache>>,
35
36 cache_config: CacheConfig,
38
39 listeners: Arc<ListenerRegistry>,
41
42 namespace: String,
44
45 started: Arc<RwLock<bool>>,
47
48 listen_task: Arc<RwLock<Option<JoinHandle<()>>>>,
50
51 shutdown: Arc<Notify>,
53}
54
55impl ConfigService {
56 pub fn new(rpc_client: Arc<RpcClient>, namespace: &str, cache_config: CacheConfig) -> Self {
58 let file_cache = cache_config
60 .cache_dir
61 .as_ref()
62 .and_then(|dir| FileCache::new(dir).ok())
63 .map(Arc::new);
64
65 Self {
66 rpc_client,
67 cache: Arc::new(ConfigCache::new()),
68 file_cache,
69 cache_config,
70 listeners: Arc::new(ListenerRegistry::new()),
71 namespace: namespace.to_string(),
72 started: Arc::new(RwLock::new(false)),
73 listen_task: Arc::new(RwLock::new(None)),
74 shutdown: Arc::new(Notify::new()),
75 }
76 }
77
78 pub async fn start(&self) -> Result<()> {
80 if *self.started.read() {
81 return Err(BatataError::ClientAlreadyStarted);
82 }
83
84 *self.started.write() = true;
85
86 let listeners = self.listeners.clone();
88 let cache = self.cache.clone();
89 let rpc_client = self.rpc_client.clone();
90 let shutdown = self.shutdown.clone();
91
92 let handle = tokio::spawn(async move {
93 Self::listen_loop(listeners, cache, rpc_client, shutdown).await;
94 });
95
96 *self.listen_task.write() = Some(handle);
97
98 info!("ConfigService started");
99 Ok(())
100 }
101
102 pub async fn stop(&self) {
104 *self.started.write() = false;
105 self.shutdown.notify_one();
106
107 if let Some(handle) = self.listen_task.write().take() {
108 handle.abort();
109 }
110
111 info!("ConfigService stopped");
112 }
113
114 pub async fn get_config(&self, data_id: &str, group: &str) -> Result<String> {
116 self.get_config_with_timeout(data_id, group, 3000).await
117 }
118
119 pub async fn get_config_with_timeout(
121 &self,
122 data_id: &str,
123 group: &str,
124 _timeout_ms: u64,
125 ) -> Result<String> {
126 let group = if group.is_empty() { DEFAULT_GROUP } else { group };
127
128 if let Some(config) = self.cache.get(data_id, group, &self.namespace) {
130 return Ok(config.content);
131 }
132
133 let request = ConfigQueryRequest::new(data_id, group, &self.namespace);
135
136 match self.rpc_client.request::<_, ConfigQueryResponse>(&request).await {
137 Ok(response) => {
138 if response.response.error_code == ConfigQueryResponse::CONFIG_NOT_FOUND {
139 return Err(BatataError::ConfigNotFound {
140 data_id: data_id.to_string(),
141 group: group.to_string(),
142 namespace: self.namespace.clone(),
143 });
144 }
145
146 let mut config = ConfigInfo::new(data_id, group, &self.namespace);
148 config.content = response.content.clone();
149 config.md5 = response.md5.clone();
150 config.last_modified = response.last_modified;
151 config.content_type = response.content_type.clone();
152 self.cache.put(config.clone());
153
154 if let Some(file_cache) = &self.file_cache {
156 if let Err(e) = file_cache.save_config(&config) {
157 warn!("Failed to save config to file cache: {}", e);
158 }
159 }
160
161 Ok(response.content)
162 }
163 Err(e) => {
164 if self.cache_config.failover_enabled {
166 if let Some(file_cache) = &self.file_cache {
167 if let Some(config) = file_cache.load_config(data_id, group, &self.namespace)
168 {
169 warn!(
170 "Using cached config due to server error: {} (dataId={}, group={})",
171 e, data_id, group
172 );
173
174 if self.cache_config.update_cache_when_empty {
176 self.cache.put(config.clone());
177 }
178
179 return Ok(config.content);
180 }
181 }
182 }
183 Err(e)
184 }
185 }
186 }
187
188 pub async fn get_config_and_sign_listener<L>(
190 &self,
191 data_id: &str,
192 group: &str,
193 listener: L,
194 ) -> Result<String>
195 where
196 L: ConfigListener + 'static,
197 {
198 let content = self.get_config(data_id, group).await?;
199
200 self.add_listener(data_id, group, listener);
202
203 let md5 = md5_hash(&content);
205 self.listeners
206 .set_md5(data_id, group, &self.namespace, &md5);
207
208 Ok(content)
209 }
210
211 pub async fn publish_config(&self, data_id: &str, group: &str, content: &str) -> Result<bool> {
213 self.publish_config_with_type(data_id, group, content, None)
214 .await
215 }
216
217 pub async fn publish_config_with_type(
219 &self,
220 data_id: &str,
221 group: &str,
222 content: &str,
223 config_type: Option<&str>,
224 ) -> Result<bool> {
225 let group = if group.is_empty() { DEFAULT_GROUP } else { group };
226
227 let mut request = ConfigPublishRequest::new(data_id, group, &self.namespace, content);
228
229 if let Some(t) = config_type {
230 request = request.with_type(t);
231 }
232
233 let response: ConfigPublishResponse = self.rpc_client.request(&request).await?;
234
235 if response.response.success {
236 let mut config = ConfigInfo::new(data_id, group, &self.namespace);
238 config.update_content(content);
239 self.cache.put(config);
240 }
241
242 Ok(response.response.success)
243 }
244
245 pub async fn remove_config(&self, data_id: &str, group: &str) -> Result<bool> {
247 let group = if group.is_empty() { DEFAULT_GROUP } else { group };
248
249 let request = ConfigRemoveRequest::new(data_id, group, &self.namespace);
250
251 let response: ConfigRemoveResponse = self.rpc_client.request(&request).await?;
252
253 if response.response.success {
254 self.cache.remove(data_id, group, &self.namespace);
255 }
256
257 Ok(response.response.success)
258 }
259
260 pub fn add_listener<L>(&self, data_id: &str, group: &str, listener: L)
262 where
263 L: ConfigListener + 'static,
264 {
265 let group = if group.is_empty() { DEFAULT_GROUP } else { group };
266
267 self.listeners
268 .add_listener(data_id, group, &self.namespace, Arc::new(listener));
269
270 if let Some(config) = self.cache.get(data_id, group, &self.namespace) {
272 self.listeners
273 .set_md5(data_id, group, &self.namespace, &config.md5);
274 }
275 }
276
277 pub fn add_callback_listener<F>(&self, data_id: &str, group: &str, callback: F)
279 where
280 F: Fn(ConfigChangeEvent) + Send + Sync + 'static,
281 {
282 self.add_listener(data_id, group, CallbackListener::new(callback));
283 }
284
285 pub fn remove_listener(&self, data_id: &str, group: &str) {
287 let group = if group.is_empty() { DEFAULT_GROUP } else { group };
288 self.listeners.remove_listener(data_id, group, &self.namespace);
289 }
290
291 pub async fn search_config(
293 &self,
294 data_id_pattern: &str,
295 group_pattern: &str,
296 page_no: i32,
297 page_size: i32,
298 ) -> Result<(i32, Vec<ConfigSearchItem>)> {
299 let request = ConfigSearchRequest::new(&self.namespace)
300 .with_data_id(data_id_pattern)
301 .with_group(group_pattern)
302 .with_page(page_no, page_size);
303
304 let response: ConfigSearchResponse = self.rpc_client.request(&request).await?;
305
306 Ok((response.total_count, response.page_items))
307 }
308
309 pub async fn search_config_blur(
311 &self,
312 data_id_pattern: &str,
313 group_pattern: &str,
314 ) -> Result<Vec<ConfigSearchItem>> {
315 let (_, items) = self.search_config(data_id_pattern, group_pattern, 1, 1000).await?;
316 Ok(items)
317 }
318
319 pub async fn get_server_status(&self) -> Result<String> {
321 if self.rpc_client.is_connected() {
322 Ok("UP".to_string())
323 } else {
324 Ok("DOWN".to_string())
325 }
326 }
327
328 async fn listen_loop(
330 listeners: Arc<ListenerRegistry>,
331 cache: Arc<ConfigCache>,
332 rpc_client: Arc<RpcClient>,
333 shutdown: Arc<Notify>,
334 ) {
335 let listen_interval = Duration::from_secs(30);
336
337 loop {
338 tokio::select! {
339 _ = shutdown.notified() => {
340 info!("Listen loop shutdown");
341 break;
342 }
343 _ = tokio::time::sleep(listen_interval) => {
344 if listeners.listener_count() == 0 {
345 continue;
346 }
347
348 let contexts = listeners.get_listen_contexts();
350 if contexts.is_empty() {
351 continue;
352 }
353
354 let mut request = ConfigBatchListenRequest::new(true);
355 for (data_id, group, tenant, md5) in contexts {
356 request = request.add_context(ConfigListenContext::new(
357 &data_id, &group, &tenant, &md5,
358 ));
359 }
360
361 match rpc_client.request::<_, ConfigChangeBatchListenResponse>(&request).await {
363 Ok(response) => {
364 for changed in response.changed_configs {
365 debug!(
366 "Config changed: dataId={}, group={}, tenant={}",
367 changed.data_id, changed.group, changed.tenant
368 );
369
370 let query_request = ConfigQueryRequest::new(
372 &changed.data_id,
373 &changed.group,
374 &changed.tenant,
375 );
376
377 match rpc_client.request::<_, ConfigQueryResponse>(&query_request).await {
378 Ok(query_response) => {
379 let old_content = cache
380 .get(&changed.data_id, &changed.group, &changed.tenant)
381 .map(|c| c.content);
382
383 let event = ConfigChangeEvent::new(
384 &changed.data_id,
385 &changed.group,
386 &changed.tenant,
387 old_content,
388 query_response.content.clone(),
389 ConfigChangeType::Modify,
390 );
391
392 let mut config = ConfigInfo::new(
394 &changed.data_id,
395 &changed.group,
396 &changed.tenant,
397 );
398 config.content = query_response.content;
399 config.md5 = query_response.md5.clone();
400 config.last_modified = query_response.last_modified;
401 cache.put(config);
402
403 listeners.set_md5(
405 &changed.data_id,
406 &changed.group,
407 &changed.tenant,
408 &query_response.md5,
409 );
410
411 listeners.notify_change(event).await;
413 }
414 Err(e) => {
415 error!("Failed to fetch changed config: {}", e);
416 }
417 }
418 }
419 }
420 Err(e) => {
421 warn!("Config listen request failed: {}", e);
422 }
423 }
424 }
425 }
426 }
427 }
428}
429
430impl Drop for ConfigService {
431 fn drop(&mut self) {
432 self.shutdown.notify_one();
433 }
434}