batata_client/config/
config_service.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use parking_lot::RwLock;
5use tokio::sync::Notify;
6use tokio::task::JoinHandle;
7use tracing::{debug, error, info, warn};
8
9use crate::api::config::{
10    ConfigBatchListenRequest, ConfigChangeBatchListenResponse, ConfigInfo, ConfigListenContext,
11    ConfigPublishRequest, ConfigPublishResponse, ConfigQueryRequest, ConfigQueryResponse,
12    ConfigRemoveRequest, ConfigRemoveResponse, ConfigSearchRequest, ConfigSearchResponse,
13    ConfigSearchItem,
14};
15use crate::common::{md5_hash, DEFAULT_GROUP};
16use crate::config::{
17    CallbackListener, ConfigCache, ConfigChangeEvent, ConfigChangeType, ConfigListener,
18    ListenerRegistry,
19};
20use crate::error::{BatataError, Result};
21use crate::remote::RpcClient;
22
23/// Configuration service for managing configurations
24pub struct ConfigService {
25    /// RPC client for server communication
26    rpc_client: Arc<RpcClient>,
27
28    /// Local configuration cache
29    cache: Arc<ConfigCache>,
30
31    /// Listener registry
32    listeners: Arc<ListenerRegistry>,
33
34    /// Namespace
35    namespace: String,
36
37    /// Whether the service is started
38    started: Arc<RwLock<bool>>,
39
40    /// Background task handle
41    listen_task: Arc<RwLock<Option<JoinHandle<()>>>>,
42
43    /// Shutdown notify
44    shutdown: Arc<Notify>,
45}
46
47impl ConfigService {
48    /// Create a new ConfigService
49    pub fn new(rpc_client: Arc<RpcClient>, namespace: &str) -> Self {
50        Self {
51            rpc_client,
52            cache: Arc::new(ConfigCache::new()),
53            listeners: Arc::new(ListenerRegistry::new()),
54            namespace: namespace.to_string(),
55            started: Arc::new(RwLock::new(false)),
56            listen_task: Arc::new(RwLock::new(None)),
57            shutdown: Arc::new(Notify::new()),
58        }
59    }
60
61    /// Start the config service
62    pub async fn start(&self) -> Result<()> {
63        if *self.started.read() {
64            return Err(BatataError::ClientAlreadyStarted);
65        }
66
67        *self.started.write() = true;
68
69        // Start background listener task
70        let listeners = self.listeners.clone();
71        let cache = self.cache.clone();
72        let rpc_client = self.rpc_client.clone();
73        let shutdown = self.shutdown.clone();
74
75        let handle = tokio::spawn(async move {
76            Self::listen_loop(listeners, cache, rpc_client, shutdown).await;
77        });
78
79        *self.listen_task.write() = Some(handle);
80
81        info!("ConfigService started");
82        Ok(())
83    }
84
85    /// Stop the config service
86    pub async fn stop(&self) {
87        *self.started.write() = false;
88        self.shutdown.notify_one();
89
90        if let Some(handle) = self.listen_task.write().take() {
91            handle.abort();
92        }
93
94        info!("ConfigService stopped");
95    }
96
97    /// Get configuration
98    pub async fn get_config(&self, data_id: &str, group: &str) -> Result<String> {
99        self.get_config_with_timeout(data_id, group, 3000).await
100    }
101
102    /// Get configuration with timeout
103    pub async fn get_config_with_timeout(
104        &self,
105        data_id: &str,
106        group: &str,
107        _timeout_ms: u64,
108    ) -> Result<String> {
109        let group = if group.is_empty() { DEFAULT_GROUP } else { group };
110
111        // Try cache first
112        if let Some(config) = self.cache.get(data_id, group, &self.namespace) {
113            return Ok(config.content);
114        }
115
116        // Fetch from server
117        let request = ConfigQueryRequest::new(data_id, group, &self.namespace);
118
119        let response: ConfigQueryResponse = self.rpc_client.request(&request).await?;
120
121        if response.response.error_code == ConfigQueryResponse::CONFIG_NOT_FOUND {
122            return Err(BatataError::ConfigNotFound {
123                data_id: data_id.to_string(),
124                group: group.to_string(),
125                namespace: self.namespace.clone(),
126            });
127        }
128
129        // Update cache
130        let mut config = ConfigInfo::new(data_id, group, &self.namespace);
131        config.content = response.content.clone();
132        config.md5 = response.md5.clone();
133        config.last_modified = response.last_modified;
134        config.content_type = response.content_type.clone();
135        self.cache.put(config);
136
137        Ok(response.content)
138    }
139
140    /// Get configuration and sign listener
141    pub async fn get_config_and_sign_listener<L>(
142        &self,
143        data_id: &str,
144        group: &str,
145        listener: L,
146    ) -> Result<String>
147    where
148        L: ConfigListener + 'static,
149    {
150        let content = self.get_config(data_id, group).await?;
151
152        // Add listener
153        self.add_listener(data_id, group, listener);
154
155        // Update MD5 in listener registry
156        let md5 = md5_hash(&content);
157        self.listeners
158            .set_md5(data_id, group, &self.namespace, &md5);
159
160        Ok(content)
161    }
162
163    /// Publish configuration
164    pub async fn publish_config(&self, data_id: &str, group: &str, content: &str) -> Result<bool> {
165        self.publish_config_with_type(data_id, group, content, None)
166            .await
167    }
168
169    /// Publish configuration with type
170    pub async fn publish_config_with_type(
171        &self,
172        data_id: &str,
173        group: &str,
174        content: &str,
175        config_type: Option<&str>,
176    ) -> Result<bool> {
177        let group = if group.is_empty() { DEFAULT_GROUP } else { group };
178
179        let mut request = ConfigPublishRequest::new(data_id, group, &self.namespace, content);
180
181        if let Some(t) = config_type {
182            request = request.with_type(t);
183        }
184
185        let response: ConfigPublishResponse = self.rpc_client.request(&request).await?;
186
187        if response.response.success {
188            // Update cache
189            let mut config = ConfigInfo::new(data_id, group, &self.namespace);
190            config.update_content(content);
191            self.cache.put(config);
192        }
193
194        Ok(response.response.success)
195    }
196
197    /// Remove configuration
198    pub async fn remove_config(&self, data_id: &str, group: &str) -> Result<bool> {
199        let group = if group.is_empty() { DEFAULT_GROUP } else { group };
200
201        let request = ConfigRemoveRequest::new(data_id, group, &self.namespace);
202
203        let response: ConfigRemoveResponse = self.rpc_client.request(&request).await?;
204
205        if response.response.success {
206            self.cache.remove(data_id, group, &self.namespace);
207        }
208
209        Ok(response.response.success)
210    }
211
212    /// Add listener for configuration changes
213    pub fn add_listener<L>(&self, data_id: &str, group: &str, listener: L)
214    where
215        L: ConfigListener + 'static,
216    {
217        let group = if group.is_empty() { DEFAULT_GROUP } else { group };
218
219        self.listeners
220            .add_listener(data_id, group, &self.namespace, Arc::new(listener));
221
222        // Set initial MD5 from cache
223        if let Some(config) = self.cache.get(data_id, group, &self.namespace) {
224            self.listeners
225                .set_md5(data_id, group, &self.namespace, &config.md5);
226        }
227    }
228
229    /// Add callback listener
230    pub fn add_callback_listener<F>(&self, data_id: &str, group: &str, callback: F)
231    where
232        F: Fn(ConfigChangeEvent) + Send + Sync + 'static,
233    {
234        self.add_listener(data_id, group, CallbackListener::new(callback));
235    }
236
237    /// Remove listener
238    pub fn remove_listener(&self, data_id: &str, group: &str) {
239        let group = if group.is_empty() { DEFAULT_GROUP } else { group };
240        self.listeners.remove_listener(data_id, group, &self.namespace);
241    }
242
243    /// Search configurations with pagination
244    pub async fn search_config(
245        &self,
246        data_id_pattern: &str,
247        group_pattern: &str,
248        page_no: i32,
249        page_size: i32,
250    ) -> Result<(i32, Vec<ConfigSearchItem>)> {
251        let request = ConfigSearchRequest::new(&self.namespace)
252            .with_data_id(data_id_pattern)
253            .with_group(group_pattern)
254            .with_page(page_no, page_size);
255
256        let response: ConfigSearchResponse = self.rpc_client.request(&request).await?;
257
258        Ok((response.total_count, response.page_items))
259    }
260
261    /// Search configurations with blur matching
262    pub async fn search_config_blur(
263        &self,
264        data_id_pattern: &str,
265        group_pattern: &str,
266    ) -> Result<Vec<ConfigSearchItem>> {
267        let (_, items) = self.search_config(data_id_pattern, group_pattern, 1, 1000).await?;
268        Ok(items)
269    }
270
271    /// Get server status
272    pub async fn get_server_status(&self) -> Result<String> {
273        if self.rpc_client.is_connected() {
274            Ok("UP".to_string())
275        } else {
276            Ok("DOWN".to_string())
277        }
278    }
279
280    /// Background listen loop for configuration changes
281    async fn listen_loop(
282        listeners: Arc<ListenerRegistry>,
283        cache: Arc<ConfigCache>,
284        rpc_client: Arc<RpcClient>,
285        shutdown: Arc<Notify>,
286    ) {
287        let listen_interval = Duration::from_secs(30);
288
289        loop {
290            tokio::select! {
291                _ = shutdown.notified() => {
292                    info!("Listen loop shutdown");
293                    break;
294                }
295                _ = tokio::time::sleep(listen_interval) => {
296                    if listeners.listener_count() == 0 {
297                        continue;
298                    }
299
300                    // Build batch listen request
301                    let contexts = listeners.get_listen_contexts();
302                    if contexts.is_empty() {
303                        continue;
304                    }
305
306                    let mut request = ConfigBatchListenRequest::new(true);
307                    for (data_id, group, tenant, md5) in contexts {
308                        request = request.add_context(ConfigListenContext::new(
309                            &data_id, &group, &tenant, &md5,
310                        ));
311                    }
312
313                    // Send listen request
314                    match rpc_client.request::<_, ConfigChangeBatchListenResponse>(&request).await {
315                        Ok(response) => {
316                            for changed in response.changed_configs {
317                                debug!(
318                                    "Config changed: dataId={}, group={}, tenant={}",
319                                    changed.data_id, changed.group, changed.tenant
320                                );
321
322                                // Fetch new content
323                                let query_request = ConfigQueryRequest::new(
324                                    &changed.data_id,
325                                    &changed.group,
326                                    &changed.tenant,
327                                );
328
329                                match rpc_client.request::<_, ConfigQueryResponse>(&query_request).await {
330                                    Ok(query_response) => {
331                                        let old_content = cache
332                                            .get(&changed.data_id, &changed.group, &changed.tenant)
333                                            .map(|c| c.content);
334
335                                        let event = ConfigChangeEvent::new(
336                                            &changed.data_id,
337                                            &changed.group,
338                                            &changed.tenant,
339                                            old_content,
340                                            query_response.content.clone(),
341                                            ConfigChangeType::Modify,
342                                        );
343
344                                        // Update cache
345                                        let mut config = ConfigInfo::new(
346                                            &changed.data_id,
347                                            &changed.group,
348                                            &changed.tenant,
349                                        );
350                                        config.content = query_response.content;
351                                        config.md5 = query_response.md5.clone();
352                                        config.last_modified = query_response.last_modified;
353                                        cache.put(config);
354
355                                        // Update MD5 in listeners
356                                        listeners.set_md5(
357                                            &changed.data_id,
358                                            &changed.group,
359                                            &changed.tenant,
360                                            &query_response.md5,
361                                        );
362
363                                        // Notify listeners
364                                        listeners.notify_change(event).await;
365                                    }
366                                    Err(e) => {
367                                        error!("Failed to fetch changed config: {}", e);
368                                    }
369                                }
370                            }
371                        }
372                        Err(e) => {
373                            warn!("Config listen request failed: {}", e);
374                        }
375                    }
376                }
377            }
378        }
379    }
380}
381
382impl Drop for ConfigService {
383    fn drop(&mut self) {
384        self.shutdown.notify_one();
385    }
386}