1use std::sync::Arc;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5use tokio::sync::Notify;
6use tokio::task::JoinHandle;
7use tracing::{debug, error, info, warn};
8
9use crate::api::config::{
10 ConfigBatchListenRequest, ConfigChangeBatchListenResponse, ConfigInfo, ConfigListenContext,
11 ConfigPublishRequest, ConfigPublishResponse, ConfigQueryRequest, ConfigQueryResponse,
12 ConfigRemoveRequest, ConfigRemoveResponse, ConfigSearchRequest, ConfigSearchResponse,
13 ConfigSearchItem,
14};
15use crate::common::{md5_hash, DEFAULT_GROUP};
16use crate::config::{
17 CallbackListener, ConfigCache, ConfigChangeEvent, ConfigChangeType, ConfigListener,
18 ListenerRegistry,
19};
20use crate::error::{BatataError, Result};
21use crate::remote::RpcClient;
22
23pub struct ConfigService {
25 rpc_client: Arc<RpcClient>,
27
28 cache: Arc<ConfigCache>,
30
31 listeners: Arc<ListenerRegistry>,
33
34 namespace: String,
36
37 started: Arc<RwLock<bool>>,
39
40 listen_task: Arc<RwLock<Option<JoinHandle<()>>>>,
42
43 shutdown: Arc<Notify>,
45}
46
47impl ConfigService {
48 pub fn new(rpc_client: Arc<RpcClient>, namespace: &str) -> Self {
50 Self {
51 rpc_client,
52 cache: Arc::new(ConfigCache::new()),
53 listeners: Arc::new(ListenerRegistry::new()),
54 namespace: namespace.to_string(),
55 started: Arc::new(RwLock::new(false)),
56 listen_task: Arc::new(RwLock::new(None)),
57 shutdown: Arc::new(Notify::new()),
58 }
59 }
60
61 pub async fn start(&self) -> Result<()> {
63 if *self.started.read() {
64 return Err(BatataError::ClientAlreadyStarted);
65 }
66
67 *self.started.write() = true;
68
69 let listeners = self.listeners.clone();
71 let cache = self.cache.clone();
72 let rpc_client = self.rpc_client.clone();
73 let shutdown = self.shutdown.clone();
74
75 let handle = tokio::spawn(async move {
76 Self::listen_loop(listeners, cache, rpc_client, shutdown).await;
77 });
78
79 *self.listen_task.write() = Some(handle);
80
81 info!("ConfigService started");
82 Ok(())
83 }
84
85 pub async fn stop(&self) {
87 *self.started.write() = false;
88 self.shutdown.notify_one();
89
90 if let Some(handle) = self.listen_task.write().take() {
91 handle.abort();
92 }
93
94 info!("ConfigService stopped");
95 }
96
97 pub async fn get_config(&self, data_id: &str, group: &str) -> Result<String> {
99 self.get_config_with_timeout(data_id, group, 3000).await
100 }
101
102 pub async fn get_config_with_timeout(
104 &self,
105 data_id: &str,
106 group: &str,
107 _timeout_ms: u64,
108 ) -> Result<String> {
109 let group = if group.is_empty() { DEFAULT_GROUP } else { group };
110
111 if let Some(config) = self.cache.get(data_id, group, &self.namespace) {
113 return Ok(config.content);
114 }
115
116 let request = ConfigQueryRequest::new(data_id, group, &self.namespace);
118
119 let response: ConfigQueryResponse = self.rpc_client.request(&request).await?;
120
121 if response.response.error_code == ConfigQueryResponse::CONFIG_NOT_FOUND {
122 return Err(BatataError::ConfigNotFound {
123 data_id: data_id.to_string(),
124 group: group.to_string(),
125 namespace: self.namespace.clone(),
126 });
127 }
128
129 let mut config = ConfigInfo::new(data_id, group, &self.namespace);
131 config.content = response.content.clone();
132 config.md5 = response.md5.clone();
133 config.last_modified = response.last_modified;
134 config.content_type = response.content_type.clone();
135 self.cache.put(config);
136
137 Ok(response.content)
138 }
139
140 pub async fn get_config_and_sign_listener<L>(
142 &self,
143 data_id: &str,
144 group: &str,
145 listener: L,
146 ) -> Result<String>
147 where
148 L: ConfigListener + 'static,
149 {
150 let content = self.get_config(data_id, group).await?;
151
152 self.add_listener(data_id, group, listener);
154
155 let md5 = md5_hash(&content);
157 self.listeners
158 .set_md5(data_id, group, &self.namespace, &md5);
159
160 Ok(content)
161 }
162
163 pub async fn publish_config(&self, data_id: &str, group: &str, content: &str) -> Result<bool> {
165 self.publish_config_with_type(data_id, group, content, None)
166 .await
167 }
168
169 pub async fn publish_config_with_type(
171 &self,
172 data_id: &str,
173 group: &str,
174 content: &str,
175 config_type: Option<&str>,
176 ) -> Result<bool> {
177 let group = if group.is_empty() { DEFAULT_GROUP } else { group };
178
179 let mut request = ConfigPublishRequest::new(data_id, group, &self.namespace, content);
180
181 if let Some(t) = config_type {
182 request = request.with_type(t);
183 }
184
185 let response: ConfigPublishResponse = self.rpc_client.request(&request).await?;
186
187 if response.response.success {
188 let mut config = ConfigInfo::new(data_id, group, &self.namespace);
190 config.update_content(content);
191 self.cache.put(config);
192 }
193
194 Ok(response.response.success)
195 }
196
197 pub async fn remove_config(&self, data_id: &str, group: &str) -> Result<bool> {
199 let group = if group.is_empty() { DEFAULT_GROUP } else { group };
200
201 let request = ConfigRemoveRequest::new(data_id, group, &self.namespace);
202
203 let response: ConfigRemoveResponse = self.rpc_client.request(&request).await?;
204
205 if response.response.success {
206 self.cache.remove(data_id, group, &self.namespace);
207 }
208
209 Ok(response.response.success)
210 }
211
212 pub fn add_listener<L>(&self, data_id: &str, group: &str, listener: L)
214 where
215 L: ConfigListener + 'static,
216 {
217 let group = if group.is_empty() { DEFAULT_GROUP } else { group };
218
219 self.listeners
220 .add_listener(data_id, group, &self.namespace, Arc::new(listener));
221
222 if let Some(config) = self.cache.get(data_id, group, &self.namespace) {
224 self.listeners
225 .set_md5(data_id, group, &self.namespace, &config.md5);
226 }
227 }
228
229 pub fn add_callback_listener<F>(&self, data_id: &str, group: &str, callback: F)
231 where
232 F: Fn(ConfigChangeEvent) + Send + Sync + 'static,
233 {
234 self.add_listener(data_id, group, CallbackListener::new(callback));
235 }
236
237 pub fn remove_listener(&self, data_id: &str, group: &str) {
239 let group = if group.is_empty() { DEFAULT_GROUP } else { group };
240 self.listeners.remove_listener(data_id, group, &self.namespace);
241 }
242
243 pub async fn search_config(
245 &self,
246 data_id_pattern: &str,
247 group_pattern: &str,
248 page_no: i32,
249 page_size: i32,
250 ) -> Result<(i32, Vec<ConfigSearchItem>)> {
251 let request = ConfigSearchRequest::new(&self.namespace)
252 .with_data_id(data_id_pattern)
253 .with_group(group_pattern)
254 .with_page(page_no, page_size);
255
256 let response: ConfigSearchResponse = self.rpc_client.request(&request).await?;
257
258 Ok((response.total_count, response.page_items))
259 }
260
261 pub async fn search_config_blur(
263 &self,
264 data_id_pattern: &str,
265 group_pattern: &str,
266 ) -> Result<Vec<ConfigSearchItem>> {
267 let (_, items) = self.search_config(data_id_pattern, group_pattern, 1, 1000).await?;
268 Ok(items)
269 }
270
271 pub async fn get_server_status(&self) -> Result<String> {
273 if self.rpc_client.is_connected() {
274 Ok("UP".to_string())
275 } else {
276 Ok("DOWN".to_string())
277 }
278 }
279
280 async fn listen_loop(
282 listeners: Arc<ListenerRegistry>,
283 cache: Arc<ConfigCache>,
284 rpc_client: Arc<RpcClient>,
285 shutdown: Arc<Notify>,
286 ) {
287 let listen_interval = Duration::from_secs(30);
288
289 loop {
290 tokio::select! {
291 _ = shutdown.notified() => {
292 info!("Listen loop shutdown");
293 break;
294 }
295 _ = tokio::time::sleep(listen_interval) => {
296 if listeners.listener_count() == 0 {
297 continue;
298 }
299
300 let contexts = listeners.get_listen_contexts();
302 if contexts.is_empty() {
303 continue;
304 }
305
306 let mut request = ConfigBatchListenRequest::new(true);
307 for (data_id, group, tenant, md5) in contexts {
308 request = request.add_context(ConfigListenContext::new(
309 &data_id, &group, &tenant, &md5,
310 ));
311 }
312
313 match rpc_client.request::<_, ConfigChangeBatchListenResponse>(&request).await {
315 Ok(response) => {
316 for changed in response.changed_configs {
317 debug!(
318 "Config changed: dataId={}, group={}, tenant={}",
319 changed.data_id, changed.group, changed.tenant
320 );
321
322 let query_request = ConfigQueryRequest::new(
324 &changed.data_id,
325 &changed.group,
326 &changed.tenant,
327 );
328
329 match rpc_client.request::<_, ConfigQueryResponse>(&query_request).await {
330 Ok(query_response) => {
331 let old_content = cache
332 .get(&changed.data_id, &changed.group, &changed.tenant)
333 .map(|c| c.content);
334
335 let event = ConfigChangeEvent::new(
336 &changed.data_id,
337 &changed.group,
338 &changed.tenant,
339 old_content,
340 query_response.content.clone(),
341 ConfigChangeType::Modify,
342 );
343
344 let mut config = ConfigInfo::new(
346 &changed.data_id,
347 &changed.group,
348 &changed.tenant,
349 );
350 config.content = query_response.content;
351 config.md5 = query_response.md5.clone();
352 config.last_modified = query_response.last_modified;
353 cache.put(config);
354
355 listeners.set_md5(
357 &changed.data_id,
358 &changed.group,
359 &changed.tenant,
360 &query_response.md5,
361 );
362
363 listeners.notify_change(event).await;
365 }
366 Err(e) => {
367 error!("Failed to fetch changed config: {}", e);
368 }
369 }
370 }
371 }
372 Err(e) => {
373 warn!("Config listen request failed: {}", e);
374 }
375 }
376 }
377 }
378 }
379 }
380}
381
382impl Drop for ConfigService {
383 fn drop(&mut self) {
384 self.shutdown.notify_one();
385 }
386}