batata_client/config/
config_service.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5use tokio::sync::Notify;
6use tokio::task::JoinHandle;
7use tracing::{debug, error, info, warn};
8
9use crate::api::config::{
10    ConfigBatchListenRequest, ConfigChangeBatchListenResponse, ConfigInfo, ConfigListenContext,
11    ConfigPublishRequest, ConfigPublishResponse, ConfigQueryRequest, ConfigQueryResponse,
12    ConfigRemoveRequest, ConfigRemoveResponse, ConfigSearchRequest, ConfigSearchResponse,
13    ConfigSearchItem,
14};
15use crate::cache::FileCache;
16use crate::common::{md5_hash, DEFAULT_GROUP};
17use crate::config::{
18    CallbackListener, ConfigCache, ConfigChangeEvent, ConfigChangeType, ConfigListener,
19    ListenerRegistry,
20};
21use crate::error::{BatataError, Result};
22use crate::remote::RpcClient;
23use crate::CacheConfig;
24
25/// Configuration service for managing configurations
26pub struct ConfigService {
27    /// RPC client for server communication
28    rpc_client: Arc<RpcClient>,
29
30    /// Local configuration cache
31    cache: Arc<ConfigCache>,
32
33    /// File cache for failover
34    file_cache: Option<Arc<FileCache>>,
35
36    /// Cache configuration
37    cache_config: CacheConfig,
38
39    /// Listener registry
40    listeners: Arc<ListenerRegistry>,
41
42    /// Namespace
43    namespace: String,
44
45    /// Whether the service is started
46    started: Arc<RwLock<bool>>,
47
48    /// Background task handle
49    listen_task: Arc<RwLock<Option<JoinHandle<()>>>>,
50
51    /// Shutdown notify
52    shutdown: Arc<Notify>,
53}
54
55impl ConfigService {
56    /// Create a new ConfigService
57    pub fn new(rpc_client: Arc<RpcClient>, namespace: &str, cache_config: CacheConfig) -> Self {
58        // Create file cache if cache directory is configured
59        let file_cache = cache_config
60            .cache_dir
61            .as_ref()
62            .and_then(|dir| FileCache::new(dir).ok())
63            .map(Arc::new);
64
65        Self {
66            rpc_client,
67            cache: Arc::new(ConfigCache::new()),
68            file_cache,
69            cache_config,
70            listeners: Arc::new(ListenerRegistry::new()),
71            namespace: namespace.to_string(),
72            started: Arc::new(RwLock::new(false)),
73            listen_task: Arc::new(RwLock::new(None)),
74            shutdown: Arc::new(Notify::new()),
75        }
76    }
77
78    /// Start the config service
79    pub async fn start(&self) -> Result<()> {
80        if *self.started.read() {
81            return Err(BatataError::ClientAlreadyStarted);
82        }
83
84        *self.started.write() = true;
85
86        // Start background listener task
87        let listeners = self.listeners.clone();
88        let cache = self.cache.clone();
89        let rpc_client = self.rpc_client.clone();
90        let shutdown = self.shutdown.clone();
91
92        let handle = tokio::spawn(async move {
93            Self::listen_loop(listeners, cache, rpc_client, shutdown).await;
94        });
95
96        *self.listen_task.write() = Some(handle);
97
98        info!("ConfigService started");
99        Ok(())
100    }
101
102    /// Stop the config service
103    pub async fn stop(&self) {
104        *self.started.write() = false;
105        self.shutdown.notify_one();
106
107        if let Some(handle) = self.listen_task.write().take() {
108            handle.abort();
109        }
110
111        info!("ConfigService stopped");
112    }
113
114    /// Get configuration
115    pub async fn get_config(&self, data_id: &str, group: &str) -> Result<String> {
116        self.get_config_with_timeout(data_id, group, 3000).await
117    }
118
119    /// Get configuration with timeout
120    pub async fn get_config_with_timeout(
121        &self,
122        data_id: &str,
123        group: &str,
124        _timeout_ms: u64,
125    ) -> Result<String> {
126        let group = if group.is_empty() { DEFAULT_GROUP } else { group };
127
128        // Try memory cache first
129        if let Some(config) = self.cache.get(data_id, group, &self.namespace) {
130            return Ok(config.content);
131        }
132
133        // Fetch from server
134        let request = ConfigQueryRequest::new(data_id, group, &self.namespace);
135
136        match self.rpc_client.request::<_, ConfigQueryResponse>(&request).await {
137            Ok(response) => {
138                if response.response.error_code == ConfigQueryResponse::CONFIG_NOT_FOUND {
139                    return Err(BatataError::ConfigNotFound {
140                        data_id: data_id.to_string(),
141                        group: group.to_string(),
142                        namespace: self.namespace.clone(),
143                    });
144                }
145
146                // Update memory cache
147                let mut config = ConfigInfo::new(data_id, group, &self.namespace);
148                config.content = response.content.clone();
149                config.md5 = response.md5.clone();
150                config.last_modified = response.last_modified;
151                config.content_type = response.content_type.clone();
152                self.cache.put(config.clone());
153
154                // Save to file cache for failover
155                if let Some(file_cache) = &self.file_cache {
156                    if let Err(e) = file_cache.save_config(&config) {
157                        warn!("Failed to save config to file cache: {}", e);
158                    }
159                }
160
161                Ok(response.content)
162            }
163            Err(e) => {
164                // Failover: try file cache
165                if self.cache_config.failover_enabled {
166                    if let Some(file_cache) = &self.file_cache {
167                        if let Some(config) = file_cache.load_config(data_id, group, &self.namespace)
168                        {
169                            warn!(
170                                "Using cached config due to server error: {} (dataId={}, group={})",
171                                e, data_id, group
172                            );
173
174                            // Optionally update memory cache
175                            if self.cache_config.update_cache_when_empty {
176                                self.cache.put(config.clone());
177                            }
178
179                            return Ok(config.content);
180                        }
181                    }
182                }
183                Err(e)
184            }
185        }
186    }
187
188    /// Get configuration and sign listener
189    pub async fn get_config_and_sign_listener<L>(
190        &self,
191        data_id: &str,
192        group: &str,
193        listener: L,
194    ) -> Result<String>
195    where
196        L: ConfigListener + 'static,
197    {
198        let content = self.get_config(data_id, group).await?;
199
200        // Add listener
201        self.add_listener(data_id, group, listener);
202
203        // Update MD5 in listener registry
204        let md5 = md5_hash(&content);
205        self.listeners
206            .set_md5(data_id, group, &self.namespace, &md5);
207
208        Ok(content)
209    }
210
211    /// Publish configuration
212    pub async fn publish_config(&self, data_id: &str, group: &str, content: &str) -> Result<bool> {
213        self.publish_config_with_type(data_id, group, content, None)
214            .await
215    }
216
217    /// Publish configuration with type
218    pub async fn publish_config_with_type(
219        &self,
220        data_id: &str,
221        group: &str,
222        content: &str,
223        config_type: Option<&str>,
224    ) -> Result<bool> {
225        let group = if group.is_empty() { DEFAULT_GROUP } else { group };
226
227        let mut request = ConfigPublishRequest::new(data_id, group, &self.namespace, content);
228
229        if let Some(t) = config_type {
230            request = request.with_type(t);
231        }
232
233        let response: ConfigPublishResponse = self.rpc_client.request(&request).await?;
234
235        if response.response.success {
236            // Update cache
237            let mut config = ConfigInfo::new(data_id, group, &self.namespace);
238            config.update_content(content);
239            self.cache.put(config);
240        }
241
242        Ok(response.response.success)
243    }
244
245    /// Remove configuration
246    pub async fn remove_config(&self, data_id: &str, group: &str) -> Result<bool> {
247        let group = if group.is_empty() { DEFAULT_GROUP } else { group };
248
249        let request = ConfigRemoveRequest::new(data_id, group, &self.namespace);
250
251        let response: ConfigRemoveResponse = self.rpc_client.request(&request).await?;
252
253        if response.response.success {
254            self.cache.remove(data_id, group, &self.namespace);
255        }
256
257        Ok(response.response.success)
258    }
259
260    /// Add listener for configuration changes
261    pub fn add_listener<L>(&self, data_id: &str, group: &str, listener: L)
262    where
263        L: ConfigListener + 'static,
264    {
265        let group = if group.is_empty() { DEFAULT_GROUP } else { group };
266
267        self.listeners
268            .add_listener(data_id, group, &self.namespace, Arc::new(listener));
269
270        // Set initial MD5 from cache
271        if let Some(config) = self.cache.get(data_id, group, &self.namespace) {
272            self.listeners
273                .set_md5(data_id, group, &self.namespace, &config.md5);
274        }
275    }
276
277    /// Add callback listener
278    pub fn add_callback_listener<F>(&self, data_id: &str, group: &str, callback: F)
279    where
280        F: Fn(ConfigChangeEvent) + Send + Sync + 'static,
281    {
282        self.add_listener(data_id, group, CallbackListener::new(callback));
283    }
284
285    /// Remove listener
286    pub fn remove_listener(&self, data_id: &str, group: &str) {
287        let group = if group.is_empty() { DEFAULT_GROUP } else { group };
288        self.listeners.remove_listener(data_id, group, &self.namespace);
289    }
290
291    /// Search configurations with pagination
292    pub async fn search_config(
293        &self,
294        data_id_pattern: &str,
295        group_pattern: &str,
296        page_no: i32,
297        page_size: i32,
298    ) -> Result<(i32, Vec<ConfigSearchItem>)> {
299        let request = ConfigSearchRequest::new(&self.namespace)
300            .with_data_id(data_id_pattern)
301            .with_group(group_pattern)
302            .with_page(page_no, page_size);
303
304        let response: ConfigSearchResponse = self.rpc_client.request(&request).await?;
305
306        Ok((response.total_count, response.page_items))
307    }
308
309    /// Search configurations with blur matching
310    pub async fn search_config_blur(
311        &self,
312        data_id_pattern: &str,
313        group_pattern: &str,
314    ) -> Result<Vec<ConfigSearchItem>> {
315        let (_, items) = self.search_config(data_id_pattern, group_pattern, 1, 1000).await?;
316        Ok(items)
317    }
318
319    /// Get server status
320    pub async fn get_server_status(&self) -> Result<String> {
321        if self.rpc_client.is_connected() {
322            Ok("UP".to_string())
323        } else {
324            Ok("DOWN".to_string())
325        }
326    }
327
328    /// Background listen loop for configuration changes
329    async fn listen_loop(
330        listeners: Arc<ListenerRegistry>,
331        cache: Arc<ConfigCache>,
332        rpc_client: Arc<RpcClient>,
333        shutdown: Arc<Notify>,
334    ) {
335        let listen_interval = Duration::from_secs(30);
336
337        loop {
338            tokio::select! {
339                _ = shutdown.notified() => {
340                    info!("Listen loop shutdown");
341                    break;
342                }
343                _ = tokio::time::sleep(listen_interval) => {
344                    if listeners.listener_count() == 0 {
345                        continue;
346                    }
347
348                    // Build batch listen request
349                    let contexts = listeners.get_listen_contexts();
350                    if contexts.is_empty() {
351                        continue;
352                    }
353
354                    let mut request = ConfigBatchListenRequest::new(true);
355                    for (data_id, group, tenant, md5) in contexts {
356                        request = request.add_context(ConfigListenContext::new(
357                            &data_id, &group, &tenant, &md5,
358                        ));
359                    }
360
361                    // Send listen request
362                    match rpc_client.request::<_, ConfigChangeBatchListenResponse>(&request).await {
363                        Ok(response) => {
364                            for changed in response.changed_configs {
365                                debug!(
366                                    "Config changed: dataId={}, group={}, tenant={}",
367                                    changed.data_id, changed.group, changed.tenant
368                                );
369
370                                // Fetch new content
371                                let query_request = ConfigQueryRequest::new(
372                                    &changed.data_id,
373                                    &changed.group,
374                                    &changed.tenant,
375                                );
376
377                                match rpc_client.request::<_, ConfigQueryResponse>(&query_request).await {
378                                    Ok(query_response) => {
379                                        let old_content = cache
380                                            .get(&changed.data_id, &changed.group, &changed.tenant)
381                                            .map(|c| c.content);
382
383                                        let event = ConfigChangeEvent::new(
384                                            &changed.data_id,
385                                            &changed.group,
386                                            &changed.tenant,
387                                            old_content,
388                                            query_response.content.clone(),
389                                            ConfigChangeType::Modify,
390                                        );
391
392                                        // Update cache
393                                        let mut config = ConfigInfo::new(
394                                            &changed.data_id,
395                                            &changed.group,
396                                            &changed.tenant,
397                                        );
398                                        config.content = query_response.content;
399                                        config.md5 = query_response.md5.clone();
400                                        config.last_modified = query_response.last_modified;
401                                        cache.put(config);
402
403                                        // Update MD5 in listeners
404                                        listeners.set_md5(
405                                            &changed.data_id,
406                                            &changed.group,
407                                            &changed.tenant,
408                                            &query_response.md5,
409                                        );
410
411                                        // Notify listeners
412                                        listeners.notify_change(event).await;
413                                    }
414                                    Err(e) => {
415                                        error!("Failed to fetch changed config: {}", e);
416                                    }
417                                }
418                            }
419                        }
420                        Err(e) => {
421                            warn!("Config listen request failed: {}", e);
422                        }
423                    }
424                }
425            }
426        }
427    }
428}
429
430impl Drop for ConfigService {
431    fn drop(&mut self) {
432        self.shutdown.notify_one();
433    }
434}