Skip to main content

dubbo_rs_metadata/
lib.rs

1use dashmap::DashMap;
2use serde::{Deserialize, Serialize};
3use std::sync::{Arc, RwLock};
4use std::time::Duration;
5
6/// Streaming type for RPC methods.
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
8pub enum StreamType {
9    /// Unary call: single request → single response.
10    #[default]
11    Unary,
12    /// Client streaming: multiple requests → single response.
13    ClientStreaming,
14    /// Server streaming: single request → multiple responses.
15    ServerStreaming,
16    /// Bidirectional streaming: multiple requests ↔ multiple responses.
17    BidiStreaming,
18}
19
20/// Per-method metadata definition.
21///
22/// Describes a single RPC method: its name, parameter types (Java-style
23/// descriptors), return type, streaming type, and whether it is a one-way call.
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub struct MethodDefinition {
26    pub name: String,
27    pub parameter_types: Vec<String>,
28    pub return_type: String,
29    pub stream_type: StreamType,
30    pub oneway: bool,
31}
32
33impl MethodDefinition {
34    #[must_use]
35    pub fn new(name: impl Into<String>, return_type: impl Into<String>) -> Self {
36        Self {
37            name: name.into(),
38            parameter_types: Vec::new(),
39            return_type: return_type.into(),
40            stream_type: StreamType::default(),
41            oneway: false,
42        }
43    }
44
45    #[must_use]
46    pub fn with_param(mut self, param_type: impl Into<String>) -> Self {
47        self.parameter_types.push(param_type.into());
48        self
49    }
50
51    #[must_use]
52    pub fn with_stream_type(mut self, stream_type: StreamType) -> Self {
53        self.stream_type = stream_type;
54        self
55    }
56
57    #[must_use]
58    pub fn with_oneway(mut self, oneway: bool) -> Self {
59        self.oneway = oneway;
60        self
61    }
62
63    #[must_use]
64    pub fn is_streaming(&self) -> bool {
65        self.stream_type != StreamType::Unary
66    }
67}
68
69/// Per-service metadata definition.
70///
71/// Describes a single Dubbo service: its interface name, version, group,
72/// and the methods it exposes.
73#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct ServiceDefinition {
75    /// Fully-qualified interface name (e.g., "com.example.Greeter").
76    pub interface: String,
77    /// Service version (e.g., "1.0.0").
78    pub version: String,
79    /// Service group.
80    pub group: String,
81    /// Methods exposed by this service.
82    pub methods: Vec<MethodDefinition>,
83    /// Service-level parameters (key-value).
84    pub params: Vec<(String, String)>,
85}
86
87impl ServiceDefinition {
88    #[must_use]
89    pub fn new(interface: impl Into<String>) -> Self {
90        Self {
91            interface: interface.into(),
92            version: "1.0.0".into(),
93            group: String::new(),
94            methods: Vec::new(),
95            params: Vec::new(),
96        }
97    }
98
99    #[must_use]
100    pub fn with_version(mut self, version: impl Into<String>) -> Self {
101        self.version = version.into();
102        self
103    }
104
105    #[must_use]
106    pub fn with_group(mut self, group: impl Into<String>) -> Self {
107        self.group = group.into();
108        self
109    }
110
111    #[must_use]
112    pub fn with_method(mut self, method: MethodDefinition) -> Self {
113        self.methods.push(method);
114        self
115    }
116
117    #[must_use]
118    pub fn with_param(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
119        self.params.push((key.into(), value.into()));
120        self
121    }
122
123    /// Build a unique service key: `{group}/{interface}:{version}`.
124    #[must_use]
125    pub fn service_key(&self) -> String {
126        if self.group.is_empty() {
127            format!("{}:{}", self.interface, self.version)
128        } else {
129            format!("{}/{}:{}", self.group, self.interface, self.version)
130        }
131    }
132}
133
134/// Application-level metadata information.
135///
136/// Contains all service definitions for a single Dubbo application,
137/// along with the application name and a revision number for change
138/// detection.
139#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
140pub struct MetadataInfo {
141    /// Application name.
142    pub application: String,
143    /// Revision number — incremented on metadata changes.
144    pub revision: u64,
145    /// All service definitions exported by this application.
146    pub services: Vec<ServiceDefinition>,
147    /// Arbitrary key-value attributes.
148    pub attributes: Vec<(String, String)>,
149}
150
151impl MetadataInfo {
152    #[must_use]
153    pub fn new(application: impl Into<String>) -> Self {
154        Self {
155            application: application.into(),
156            revision: 0,
157            services: Vec::new(),
158            attributes: Vec::new(),
159        }
160    }
161
162    #[must_use]
163    pub fn with_revision(mut self, revision: u64) -> Self {
164        self.revision = revision;
165        self
166    }
167
168    #[must_use]
169    pub fn with_service(mut self, service: ServiceDefinition) -> Self {
170        self.services.push(service);
171        self
172    }
173
174    #[must_use]
175    pub fn with_attr(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
176        self.attributes.push((key.into(), value.into()));
177        self
178    }
179
180    /// Find a service definition by interface name.
181    #[must_use]
182    pub fn find_service(&self, interface: &str) -> Option<&ServiceDefinition> {
183        self.services.iter().find(|s| s.interface == interface)
184    }
185
186    /// Increment revision and return a new `MetadataInfo` with updated revision.
187    #[must_use]
188    pub fn bump_revision(mut self) -> Self {
189        self.revision += 1;
190        self
191    }
192}
193
194/// Metadata storage abstraction.
195///
196/// Implementations may use in-memory storage, a remote metadata center,
197/// or a composite of both.
198pub trait MetadataStorage: Send + Sync {
199    /// Store metadata for a given application.
200    fn store(&self, metadata: MetadataInfo);
201
202    /// Retrieve metadata for a given application.
203    fn get(&self, application: &str) -> Option<MetadataInfo>;
204
205    /// Remove metadata for a given application.
206    fn remove(&self, application: &str) -> Option<MetadataInfo>;
207
208    /// List all known application names.
209    fn applications(&self) -> Vec<String>;
210}
211
212/// In-memory metadata storage backed by a concurrent hash map.
213///
214/// Suitable for local development and testing. Production deployments
215/// should use a remote metadata center for cross-node visibility.
216pub struct InMemoryMetadataStorage {
217    store: DashMap<String, MetadataInfo>,
218}
219
220impl InMemoryMetadataStorage {
221    #[must_use]
222    pub fn new() -> Self {
223        Self {
224            store: DashMap::new(),
225        }
226    }
227}
228
229impl Default for InMemoryMetadataStorage {
230    fn default() -> Self {
231        Self::new()
232    }
233}
234
235impl MetadataStorage for InMemoryMetadataStorage {
236    fn store(&self, metadata: MetadataInfo) {
237        self.store.insert(metadata.application.clone(), metadata);
238    }
239
240    fn get(&self, application: &str) -> Option<MetadataInfo> {
241        self.store.get(application).map(|entry| entry.clone())
242    }
243
244    fn remove(&self, application: &str) -> Option<MetadataInfo> {
245        self.store.remove(application).map(|(_, v)| v)
246    }
247
248    fn applications(&self) -> Vec<String> {
249        self.store.iter().map(|entry| entry.key().clone()).collect()
250    }
251}
252
253/// A no-op watcher for `ZooKeeper` connections.
254///
255/// Silently ignores all watched events. Used as the default watcher
256/// when establishing a ZK connection for metadata storage.
257struct NoopWatcher;
258
259impl zookeeper::Watcher for NoopWatcher {
260    fn handle(&self, _event: zookeeper::WatchedEvent) {}
261}
262
263/// ZooKeeper-backed metadata storage.
264///
265/// Stores `MetadataInfo` as JSON documents in `ZooKeeper`, organized under
266/// a configurable root path (default: `/dubbo/metadata`). Each application's
267/// metadata is stored at `{root_path}/{application}`.
268///
269/// Connections are established lazily on first access and reused across
270/// subsequent operations.
271pub struct ZkMetadataStorage {
272    zk_addr: String,
273    root_path: String,
274    zk: RwLock<Option<zookeeper::ZooKeeper>>,
275}
276
277impl ZkMetadataStorage {
278    /// Create a new `ZkMetadataStorage` pointing at the given `ZooKeeper` address.
279    ///
280    /// Uses `/dubbo/metadata` as the default root path.
281    #[must_use]
282    pub fn new(zk_addr: &str) -> Self {
283        Self {
284            zk_addr: zk_addr.to_string(),
285            root_path: "/dubbo/metadata".to_string(),
286            zk: RwLock::new(None),
287        }
288    }
289
290    /// Set a custom root path (builder pattern).
291    #[must_use]
292    pub fn with_root_path(mut self, path: &str) -> Self {
293        self.root_path = path.to_string();
294        self
295    }
296
297    /// Ensure a ZK connection exists, creating one lazily if needed.
298    fn ensure_connection(&self) -> Result<(), String> {
299        {
300            let guard = self.zk.read().map_err(|e| e.to_string())?;
301            if guard.is_some() {
302                return Ok(());
303            }
304        }
305
306        let mut guard = self.zk.write().map_err(|e| e.to_string())?;
307        if guard.is_some() {
308            return Ok(());
309        }
310
311        let zk = zookeeper::ZooKeeper::connect(&self.zk_addr, Duration::from_secs(5), NoopWatcher)
312            .map_err(|e| format!("ZK connect error: {e}"))?;
313
314        *guard = Some(zk);
315        Ok(())
316    }
317
318    /// Build the full ZK path for a given application name.
319    fn app_path(&self, app: &str) -> String {
320        format!("{}/{}", self.root_path, app)
321    }
322
323    /// Ensure all ancestor znodes exist along `path`, creating persistent
324    /// nodes where needed.
325    fn ensure_path(&self, path: &str) -> Result<(), String> {
326        let guard = self.zk.read().map_err(|e| e.to_string())?;
327        let zk = guard.as_ref().ok_or("ZK not connected")?;
328
329        let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
330        let mut current = String::new();
331
332        for part in &parts {
333            current.push('/');
334            current.push_str(part);
335
336            if zk
337                .exists(&current, false)
338                .map_err(|e| format!("ZK exists error for {current}: {e}"))?
339                .is_none()
340            {
341                zk.create(
342                    &current,
343                    Vec::new(),
344                    zookeeper::Acl::open_unsafe().clone(),
345                    zookeeper::CreateMode::Persistent,
346                )
347                .map_err(|e| format!("ZK create path error for {current}: {e}"))?;
348            }
349        }
350
351        Ok(())
352    }
353
354    /// Get a reference to the inner ZK client (must be called after `ensure_connection`).
355    fn with_zk<F, T>(&self, f: F) -> Result<T, String>
356    where
357        F: FnOnce(&zookeeper::ZooKeeper) -> Result<T, String>,
358    {
359        let guard = self.zk.read().map_err(|e| e.to_string())?;
360        let zk = guard.as_ref().ok_or("ZK not connected")?;
361        f(zk)
362    }
363}
364
365impl MetadataStorage for ZkMetadataStorage {
366    fn store(&self, metadata: MetadataInfo) {
367        if self.ensure_connection().is_err() {
368            return;
369        }
370
371        let path = self.app_path(&metadata.application);
372        if self.ensure_path(&path).is_err() {
373            return;
374        }
375
376        let Ok(json) = serde_json::to_vec(&metadata) else {
377            return;
378        };
379
380        let _ = self.with_zk(|zk| {
381            match zk.exists(&path, false) {
382                Ok(Some(_)) => {
383                    zk.set_data(&path, json, None)
384                        .map_err(|e| format!("ZK set_data error: {e}"))?;
385                }
386                Ok(None) => {
387                    zk.create(
388                        &path,
389                        json,
390                        zookeeper::Acl::open_unsafe().clone(),
391                        zookeeper::CreateMode::Persistent,
392                    )
393                    .map_err(|e| format!("ZK create error: {e}"))?;
394                }
395                Err(e) => return Err(format!("ZK exists error: {e}")),
396            }
397            Ok(())
398        });
399    }
400
401    fn get(&self, application: &str) -> Option<MetadataInfo> {
402        if self.ensure_connection().is_err() {
403            return None;
404        }
405
406        let path = self.app_path(application);
407
408        self.with_zk(|zk| {
409            let (data, _) = zk
410                .get_data(&path, false)
411                .map_err(|e| format!("ZK get_data error: {e}"))?;
412            let metadata: MetadataInfo = serde_json::from_slice(&data)
413                .map_err(|e| format!("JSON deserialize error: {e}"))?;
414            Ok(metadata)
415        })
416        .ok()
417    }
418
419    fn remove(&self, application: &str) -> Option<MetadataInfo> {
420        let removed = self.get(application)?;
421        let path = self.app_path(application);
422
423        let _ = self.with_zk(|zk| {
424            zk.delete(&path, None)
425                .map_err(|e| format!("ZK delete error: {e}"))
426        });
427
428        Some(removed)
429    }
430
431    fn applications(&self) -> Vec<String> {
432        if self.ensure_connection().is_err() {
433            return Vec::new();
434        }
435
436        self.with_zk(|zk| {
437            zk.get_children(&self.root_path, false)
438                .map_err(|e| format!("ZK get_children error: {e}"))
439        })
440        .unwrap_or_default()
441    }
442}
443
444/// Nacos-backed metadata storage.
445///
446/// Stores `MetadataInfo` as JSON documents in Nacos Config service,
447/// using the Nacos Open API (`/nacos/v1/cs/configs`).
448///
449/// Each application's metadata is stored as a config entry with:
450/// - `dataId`: `{data_id_prefix}{application}` (default: `dubbo.metadata.{app}`)
451/// - `group`: configurable (default: `dubbo`)
452/// - `namespaceId`: configurable (default: `public`)
453///
454/// Because Nacos lacks a direct config-list API, a local `DashMap` cache
455/// tracks known application names: `store()` adds to it, `remove()` removes
456/// from it, and `applications()` returns cached keys.
457pub struct NacosMetadataStorage {
458    server_addr: String,
459    namespace: String,
460    group: String,
461    data_id_prefix: String,
462    client: reqwest::blocking::Client,
463    known_apps: DashMap<String, ()>,
464}
465
466impl NacosMetadataStorage {
467    /// Create a new `NacosMetadataStorage` pointing at the given Nacos server address.
468    ///
469    /// Uses default values: `namespace=public`, `group=dubbo`,
470    /// `data_id_prefix=dubbo.metadata.`.
471    #[must_use]
472    pub fn new(server_addr: &str) -> Self {
473        Self {
474            server_addr: server_addr.to_string(),
475            namespace: "public".to_string(),
476            group: "dubbo".to_string(),
477            data_id_prefix: "dubbo.metadata.".to_string(),
478            client: reqwest::blocking::Client::new(),
479            known_apps: DashMap::new(),
480        }
481    }
482
483    /// Set a custom Nacos namespace (builder pattern).
484    #[must_use]
485    pub fn with_namespace(mut self, ns: &str) -> Self {
486        self.namespace = ns.to_string();
487        self
488    }
489
490    /// Set a custom Nacos group (builder pattern).
491    #[must_use]
492    pub fn with_group(mut self, group: &str) -> Self {
493        self.group = group.to_string();
494        self
495    }
496
497    /// Set a custom data ID prefix (builder pattern).
498    ///
499    /// Default is `"dubbo.metadata."`. The full data ID for an application
500    /// is `{prefix}{application_name}`.
501    #[must_use]
502    pub fn with_data_id_prefix(mut self, prefix: &str) -> Self {
503        self.data_id_prefix = prefix.to_string();
504        self
505    }
506
507    /// Build the Nacos data ID for a given application name.
508    ///
509    /// Returns `{data_id_prefix}{application}`.
510    #[must_use]
511    pub fn data_id_for(&self, app: &str) -> String {
512        format!("{}{}", self.data_id_prefix, app)
513    }
514}
515
516impl MetadataStorage for NacosMetadataStorage {
517    fn store(&self, metadata: MetadataInfo) {
518        let app = metadata.application.clone();
519        let data_id = self.data_id_for(&app);
520
521        let json = match serde_json::to_string(&metadata) {
522            Ok(data) => data,
523            Err(e) => {
524                tracing::warn!("Nacos store: JSON serialization failed for app '{app}': {e}");
525                return;
526            }
527        };
528
529        let url = format!("{}/nacos/v1/cs/configs", self.server_addr);
530        let result = self
531            .client
532            .post(&url)
533            .form(&[
534                ("dataId", data_id.as_str()),
535                ("group", self.group.as_str()),
536                ("content", json.as_str()),
537                ("namespaceId", self.namespace.as_str()),
538                ("type", "json"),
539            ])
540            .send();
541
542        match result {
543            Ok(resp) if resp.status().is_success() => {
544                self.known_apps.insert(app, ());
545            }
546            Ok(resp) => {
547                tracing::warn!(
548                    "Nacos store: server returned status {} for app '{app}'",
549                    resp.status()
550                );
551            }
552            Err(e) => {
553                tracing::warn!("Nacos store: HTTP request failed for app '{app}': {e}");
554            }
555        }
556    }
557
558    fn get(&self, application: &str) -> Option<MetadataInfo> {
559        let data_id = self.data_id_for(application);
560        let url = format!("{}/nacos/v1/cs/configs", self.server_addr);
561
562        let resp = self
563            .client
564            .get(&url)
565            .query(&[
566                ("dataId", data_id.as_str()),
567                ("group", self.group.as_str()),
568                ("namespaceId", self.namespace.as_str()),
569            ])
570            .send();
571
572        match resp {
573            Ok(resp) if resp.status().is_success() => match resp.text() {
574                Ok(text) if !text.is_empty() => serde_json::from_str::<MetadataInfo>(&text)
575                    .map_err(|e| {
576                        tracing::warn!(
577                            "Nacos get: JSON deserialization failed for app '{application}': {e}"
578                        );
579                        e
580                    })
581                    .ok(),
582                _ => None,
583            },
584            Ok(resp) => {
585                tracing::warn!(
586                    "Nacos get: server returned status {} for app '{application}'",
587                    resp.status()
588                );
589                None
590            }
591            Err(e) => {
592                tracing::warn!("Nacos get: HTTP request failed for app '{application}': {e}");
593                None
594            }
595        }
596    }
597
598    fn remove(&self, application: &str) -> Option<MetadataInfo> {
599        let existing = self.get(application)?;
600
601        let data_id = self.data_id_for(application);
602        let url = format!("{}/nacos/v1/cs/configs", self.server_addr);
603
604        let result = self
605            .client
606            .delete(&url)
607            .query(&[
608                ("dataId", data_id.as_str()),
609                ("group", self.group.as_str()),
610                ("namespaceId", self.namespace.as_str()),
611            ])
612            .send();
613
614        match result {
615            Ok(resp) if resp.status().is_success() => {
616                self.known_apps.remove(application);
617            }
618            Ok(resp) => {
619                tracing::warn!(
620                    "Nacos remove: server returned status {} for app '{application}'",
621                    resp.status()
622                );
623            }
624            Err(e) => {
625                tracing::warn!("Nacos remove: HTTP request failed for app '{application}': {e}");
626            }
627        }
628
629        Some(existing)
630    }
631
632    fn applications(&self) -> Vec<String> {
633        self.known_apps.iter().map(|e| e.key().clone()).collect()
634    }
635}
636
637/// The standard Dubbo MetadataService interface.
638///
639/// Every Dubbo provider that supports application-level service
640/// discovery must export a MetadataService. Consumers query this
641/// service to obtain the provider's service definitions.
642#[async_trait::async_trait]
643pub trait MetadataService: Send + Sync {
644    /// Get the full metadata info for a given application.
645    async fn get_metadata_info(&self, application: String) -> Option<MetadataInfo>;
646
647    /// Get the JSON-encoded service definition for a given service path.
648    async fn get_service_definition(&self, path: String) -> Option<String>;
649
650    /// Get all exported service URLs (Dubbo URL format) for an application.
651    async fn get_exported_service_urls(&self, application: String) -> Vec<String>;
652
653    /// Health-check: always returns "true".
654    async fn echo(&self, msg: String) -> String;
655}
656
657/// A concrete implementation of `MetadataService` backed by a `MetadataStorage`.
658pub struct DefaultMetadataService {
659    storage: Arc<dyn MetadataStorage>,
660}
661
662impl DefaultMetadataService {
663    #[must_use]
664    pub fn new(storage: Arc<dyn MetadataStorage>) -> Self {
665        Self { storage }
666    }
667}
668
669#[async_trait::async_trait]
670impl MetadataService for DefaultMetadataService {
671    async fn get_metadata_info(&self, application: String) -> Option<MetadataInfo> {
672        self.storage.get(&application)
673    }
674
675    async fn get_service_definition(&self, path: String) -> Option<String> {
676        for app in self.storage.applications() {
677            if let Some(meta) = self.storage.get(&app) {
678                for svc in &meta.services {
679                    if svc.interface == path {
680                        return serde_json::to_string(svc).ok();
681                    }
682                }
683            }
684        }
685        None
686    }
687
688    async fn get_exported_service_urls(&self, application: String) -> Vec<String> {
689        self.storage
690            .get(&application)
691            .map(|meta| {
692                meta.services
693                    .iter()
694                    .map(|svc| format!("{}:{}", svc.interface, svc.version))
695                    .collect()
696            })
697            .unwrap_or_default()
698    }
699
700    async fn echo(&self, msg: String) -> String {
701        msg
702    }
703}
704
705#[cfg(test)]
706mod tests {
707    use super::*;
708
709    #[test]
710    fn test_method_definition_builder() {
711        let method = MethodDefinition::new("sayHello", "Ljava/lang/String;")
712            .with_param("Ljava/lang/String;")
713            .with_oneway(false);
714
715        assert_eq!(method.name, "sayHello");
716        assert_eq!(method.return_type, "Ljava/lang/String;");
717        assert_eq!(method.parameter_types, vec!["Ljava/lang/String;"]);
718        assert!(!method.oneway);
719    }
720
721    #[test]
722    fn test_method_definition_oneway() {
723        let method = MethodDefinition::new("notify", "V").with_oneway(true);
724
725        assert!(method.oneway);
726        assert_eq!(method.return_type, "V");
727    }
728
729    #[test]
730    fn test_service_definition_builder() {
731        let svc = ServiceDefinition::new("com.example.Greeter")
732            .with_version("1.0.0")
733            .with_group("default")
734            .with_method(
735                MethodDefinition::new("sayHello", "Ljava/lang/String;")
736                    .with_param("Ljava/lang/String;"),
737            )
738            .with_method(MethodDefinition::new("echo", "Ljava/lang/String;"))
739            .with_param("timeout", "3000");
740
741        assert_eq!(svc.interface, "com.example.Greeter");
742        assert_eq!(svc.version, "1.0.0");
743        assert_eq!(svc.group, "default");
744        assert_eq!(svc.methods.len(), 2);
745        assert_eq!(svc.params.len(), 1);
746    }
747
748    #[test]
749    fn test_service_key_format() {
750        let svc = ServiceDefinition::new("com.example.Greeter").with_version("1.0.0");
751        assert_eq!(svc.service_key(), "com.example.Greeter:1.0.0");
752
753        let svc_grouped = ServiceDefinition::new("com.example.Greeter")
754            .with_version("1.0.0")
755            .with_group("dev");
756        assert_eq!(svc_grouped.service_key(), "dev/com.example.Greeter:1.0.0");
757    }
758
759    #[test]
760    fn test_metadata_info_builder() {
761        let meta = MetadataInfo::new("demo-provider")
762            .with_revision(3)
763            .with_service(
764                ServiceDefinition::new("com.example.Greeter")
765                    .with_method(MethodDefinition::new("sayHello", "V")),
766            )
767            .with_attr("owner", "team-a");
768
769        assert_eq!(meta.application, "demo-provider");
770        assert_eq!(meta.revision, 3);
771        assert_eq!(meta.services.len(), 1);
772        assert_eq!(meta.attributes.len(), 1);
773    }
774
775    #[test]
776    fn test_metadata_info_find_service() {
777        let meta = MetadataInfo::new("demo-provider")
778            .with_service(ServiceDefinition::new("com.example.Greeter"))
779            .with_service(ServiceDefinition::new("com.example.UserService"));
780
781        assert!(meta.find_service("com.example.Greeter").is_some());
782        assert!(meta.find_service("com.example.UserService").is_some());
783        assert!(meta.find_service("com.example.Unknown").is_none());
784    }
785
786    #[test]
787    fn test_metadata_info_bump_revision() {
788        let meta = MetadataInfo::new("demo-provider")
789            .with_revision(0)
790            .bump_revision();
791
792        assert_eq!(meta.revision, 1);
793    }
794
795    #[test]
796    fn test_in_memory_storage_store_and_get() {
797        let storage = InMemoryMetadataStorage::new();
798
799        let meta = MetadataInfo::new("demo-provider")
800            .with_service(ServiceDefinition::new("com.example.Greeter"));
801
802        storage.store(meta.clone());
803
804        let retrieved = storage.get("demo-provider");
805        assert!(retrieved.is_some());
806        assert_eq!(retrieved.unwrap().application, "demo-provider");
807    }
808
809    #[test]
810    fn test_in_memory_storage_get_nonexistent() {
811        let storage = InMemoryMetadataStorage::new();
812        assert!(storage.get("unknown-app").is_none());
813    }
814
815    #[test]
816    fn test_in_memory_storage_remove() {
817        let storage = InMemoryMetadataStorage::new();
818        let meta = MetadataInfo::new("demo-provider");
819        storage.store(meta);
820
821        let removed = storage.remove("demo-provider");
822        assert!(removed.is_some());
823
824        assert!(storage.get("demo-provider").is_none());
825    }
826
827    #[test]
828    fn test_in_memory_storage_applications() {
829        let storage = InMemoryMetadataStorage::new();
830
831        storage.store(MetadataInfo::new("app-a"));
832        storage.store(MetadataInfo::new("app-b"));
833        storage.store(MetadataInfo::new("app-c"));
834
835        let mut apps = storage.applications();
836        apps.sort();
837        assert_eq!(apps, vec!["app-a", "app-b", "app-c"]);
838    }
839
840    #[test]
841    fn test_in_memory_storage_overwrite() {
842        let storage = InMemoryMetadataStorage::new();
843
844        let meta_v1 = MetadataInfo::new("demo-provider").with_revision(1);
845        storage.store(meta_v1);
846
847        let meta_v2 = MetadataInfo::new("demo-provider").with_revision(2);
848        storage.store(meta_v2);
849
850        let retrieved = storage.get("demo-provider").unwrap();
851        assert_eq!(retrieved.revision, 2);
852    }
853
854    #[test]
855    fn test_metadata_service_get_metadata_info() {
856        let storage = Arc::new(InMemoryMetadataStorage::new());
857        storage.store(
858            MetadataInfo::new("demo-provider")
859                .with_service(ServiceDefinition::new("com.example.Greeter")),
860        );
861
862        let service = DefaultMetadataService::new(storage);
863
864        let rt = tokio::runtime::Runtime::new().unwrap();
865        let result = rt.block_on(service.get_metadata_info("demo-provider".into()));
866        assert!(result.is_some());
867        assert_eq!(result.unwrap().services.len(), 1);
868    }
869
870    #[test]
871    fn test_metadata_service_echo() {
872        let storage = Arc::new(InMemoryMetadataStorage::new());
873        let service = DefaultMetadataService::new(storage);
874
875        let rt = tokio::runtime::Runtime::new().unwrap();
876        let result = rt.block_on(service.echo("ping".into()));
877        assert_eq!(result, "ping");
878    }
879
880    #[test]
881    fn test_metadata_service_get_missing_metadata() {
882        let storage = Arc::new(InMemoryMetadataStorage::new());
883        let service = DefaultMetadataService::new(storage);
884
885        let rt = tokio::runtime::Runtime::new().unwrap();
886        let result = rt.block_on(service.get_metadata_info("unknown".into()));
887        assert!(result.is_none());
888    }
889
890    #[test]
891    fn test_metadata_service_get_exported_urls() {
892        let storage = Arc::new(InMemoryMetadataStorage::new());
893        storage.store(
894            MetadataInfo::new("demo-provider")
895                .with_service(ServiceDefinition::new("com.example.Greeter").with_version("1.0.0"))
896                .with_service(
897                    ServiceDefinition::new("com.example.UserService").with_version("2.0.0"),
898                ),
899        );
900
901        let service = DefaultMetadataService::new(storage);
902
903        let rt = tokio::runtime::Runtime::new().unwrap();
904        let urls = rt.block_on(service.get_exported_service_urls("demo-provider".into()));
905        assert_eq!(urls.len(), 2);
906        assert!(urls.contains(&"com.example.Greeter:1.0.0".to_string()));
907        assert!(urls.contains(&"com.example.UserService:2.0.0".to_string()));
908    }
909
910    #[test]
911    fn test_metadata_service_get_service_definition() {
912        let storage = Arc::new(InMemoryMetadataStorage::new());
913
914        let svc = ServiceDefinition::new("com.example.Greeter")
915            .with_version("1.0.0")
916            .with_method(MethodDefinition::new("sayHello", "Ljava/lang/String;"));
917        let iface = svc.interface.clone();
918
919        storage.store(MetadataInfo::new("demo-provider").with_service(svc));
920
921        let service = DefaultMetadataService::new(storage);
922
923        let rt = tokio::runtime::Runtime::new().unwrap();
924        let result = rt.block_on(service.get_service_definition(iface));
925        assert!(result.is_some());
926        let def_str = result.unwrap();
927        assert!(def_str.contains("com.example.Greeter"));
928        assert!(def_str.contains("1.0.0"));
929    }
930
931    #[test]
932    fn test_serde_roundtrip_metadata_info() {
933        let meta = MetadataInfo::new("demo-provider")
934            .with_revision(5)
935            .with_service(
936                ServiceDefinition::new("com.example.Greeter")
937                    .with_method(MethodDefinition::new("sayHello", "V")),
938            );
939
940        let json = serde_json::to_string(&meta).unwrap();
941        let parsed: MetadataInfo = serde_json::from_str(&json).unwrap();
942
943        assert_eq!(parsed.application, meta.application);
944        assert_eq!(parsed.revision, meta.revision);
945        assert_eq!(parsed.services.len(), meta.services.len());
946    }
947
948    #[test]
949    fn test_service_definition_empty_group() {
950        let svc = ServiceDefinition::new("com.example.Foo").with_version("2.0");
951        assert_eq!(svc.group, "");
952        assert_eq!(svc.service_key(), "com.example.Foo:2.0");
953    }
954
955    #[test]
956    fn test_method_definition_empty_params() {
957        let method = MethodDefinition::new("ping", "V");
958        assert!(method.parameter_types.is_empty());
959    }
960
961    #[test]
962    fn test_stream_type_default_is_unary() {
963        let method = MethodDefinition::new("sayHello", "V");
964        assert_eq!(method.stream_type, StreamType::Unary);
965        assert!(!method.is_streaming());
966    }
967
968    #[test]
969    fn test_method_definition_streaming() {
970        let method =
971            MethodDefinition::new("upload", "V").with_stream_type(StreamType::ClientStreaming);
972        assert_eq!(method.stream_type, StreamType::ClientStreaming);
973        assert!(method.is_streaming());
974    }
975
976    #[test]
977    fn test_stream_type_all_variants() {
978        let variants = [
979            (StreamType::Unary, false),
980            (StreamType::ClientStreaming, true),
981            (StreamType::ServerStreaming, true),
982            (StreamType::BidiStreaming, true),
983        ];
984
985        for (st, expected) in variants {
986            let method = MethodDefinition::new("m", "V").with_stream_type(st);
987            assert_eq!(method.is_streaming(), expected);
988        }
989    }
990
991    #[test]
992    fn test_zk_metadata_storage_new() {
993        let storage = ZkMetadataStorage::new("127.0.0.1:2181");
994        assert_eq!(storage.zk_addr, "127.0.0.1:2181");
995        assert_eq!(storage.root_path, "/dubbo/metadata");
996        assert!(storage.zk.read().unwrap().is_none());
997    }
998
999    #[test]
1000    fn test_zk_metadata_storage_with_root_path() {
1001        let storage = ZkMetadataStorage::new("127.0.0.1:2181").with_root_path("/custom/metadata");
1002        assert_eq!(storage.root_path, "/custom/metadata");
1003    }
1004
1005    #[test]
1006    fn test_zk_metadata_storage_app_path() {
1007        let storage = ZkMetadataStorage::new("127.0.0.1:2181");
1008        assert_eq!(
1009            storage.app_path("demo-provider"),
1010            "/dubbo/metadata/demo-provider"
1011        );
1012
1013        let storage_custom = ZkMetadataStorage::new("127.0.0.1:2181").with_root_path("/myapp/meta");
1014        assert_eq!(storage_custom.app_path("my-app"), "/myapp/meta/my-app");
1015    }
1016
1017    #[test]
1018    fn test_zk_metadata_store_serialization() {
1019        let meta = MetadataInfo::new("demo-provider")
1020            .with_revision(3)
1021            .with_service(
1022                ServiceDefinition::new("com.example.Greeter")
1023                    .with_version("1.0.0")
1024                    .with_method(
1025                        MethodDefinition::new("sayHello", "Ljava/lang/String;")
1026                            .with_param("Ljava/lang/String;"),
1027                    ),
1028            )
1029            .with_attr("owner", "team-a");
1030
1031        let json = serde_json::to_vec(&meta).unwrap();
1032        let parsed: MetadataInfo = serde_json::from_slice(&json).unwrap();
1033
1034        assert_eq!(parsed.application, "demo-provider");
1035        assert_eq!(parsed.revision, 3);
1036        assert_eq!(parsed.services.len(), 1);
1037        assert_eq!(parsed.services[0].interface, "com.example.Greeter");
1038        assert_eq!(parsed.attributes.len(), 1);
1039    }
1040
1041    #[test]
1042    fn test_zk_metadata_get_without_connection_returns_none() {
1043        let storage = ZkMetadataStorage::new("256.256.256.256:99999");
1044        let result = storage.get("any-app");
1045        assert!(result.is_none());
1046    }
1047
1048    #[test]
1049    fn test_zk_metadata_remove_without_connection_returns_none() {
1050        let storage = ZkMetadataStorage::new("256.256.256.256:99999");
1051        let result = storage.remove("any-app");
1052        assert!(result.is_none());
1053    }
1054
1055    #[test]
1056    fn test_in_memory_store_still_works() {
1057        let storage = InMemoryMetadataStorage::new();
1058        let meta = MetadataInfo::new("verify-app")
1059            .with_revision(42)
1060            .with_service(ServiceDefinition::new("com.example.VerifyService"));
1061
1062        storage.store(meta);
1063        let retrieved = storage.get("verify-app").unwrap();
1064        assert_eq!(retrieved.application, "verify-app");
1065        assert_eq!(retrieved.revision, 42);
1066
1067        let apps = storage.applications();
1068        assert_eq!(apps, vec!["verify-app"]);
1069
1070        let removed = storage.remove("verify-app").unwrap();
1071        assert_eq!(removed.application, "verify-app");
1072        assert!(storage.get("verify-app").is_none());
1073    }
1074
1075    #[test]
1076    fn test_metadata_info_json_roundtrip() {
1077        let meta = MetadataInfo::new("roundtrip-app")
1078            .with_revision(7)
1079            .with_service(
1080                ServiceDefinition::new("com.example.Svc")
1081                    .with_version("2.0.0")
1082                    .with_group("prod")
1083                    .with_method(
1084                        MethodDefinition::new("process", "V")
1085                            .with_param("Ljava/lang/String;")
1086                            .with_stream_type(StreamType::BidiStreaming),
1087                    )
1088                    .with_param("timeout", "5000"),
1089            )
1090            .with_attr("env", "production");
1091
1092        let json_bytes = serde_json::to_vec(&meta).unwrap();
1093        let deserialized: MetadataInfo = serde_json::from_slice(&json_bytes).unwrap();
1094
1095        assert_eq!(deserialized, meta);
1096    }
1097
1098    fn start_mock_http_server(status: u16, body: impl Into<String>) -> String {
1099        use std::io::{Read, Write};
1100        let body = body.into();
1101        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1102        let addr = listener.local_addr().unwrap();
1103
1104        std::thread::spawn(move || {
1105            for _ in 0..10 {
1106                match listener.accept() {
1107                    Ok((mut stream, _)) => {
1108                        let mut buf = vec![0u8; 65536];
1109                        let _ = stream.read(&mut buf);
1110                        let resp = format!(
1111                            "HTTP/1.1 {status} OK\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n{body}",
1112                            body.len()
1113                        );
1114                        let _ = stream.write_all(resp.as_bytes());
1115                        let _ = stream.flush();
1116                    }
1117                    Err(_) => break,
1118                }
1119            }
1120        });
1121
1122        std::thread::sleep(std::time::Duration::from_millis(50));
1123        format!("http://{addr}")
1124    }
1125
1126    fn start_multi_mock_http_server(responses: Vec<(u16, String)>) -> String {
1127        use std::io::{Read, Write};
1128        let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1129        let addr = listener.local_addr().unwrap();
1130
1131        std::thread::spawn(move || {
1132            for (status, body) in responses {
1133                match listener.accept() {
1134                    Ok((mut stream, _)) => {
1135                        let mut buf = vec![0u8; 65536];
1136                        let _ = stream.read(&mut buf);
1137                        let resp = format!(
1138                            "HTTP/1.1 {status} OK\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n{body}",
1139                            body.len()
1140                        );
1141                        let _ = stream.write_all(resp.as_bytes());
1142                        let _ = stream.flush();
1143                    }
1144                    Err(_) => break,
1145                }
1146            }
1147        });
1148
1149        std::thread::sleep(std::time::Duration::from_millis(50));
1150        format!("http://{addr}")
1151    }
1152
1153    #[test]
1154    fn test_nacos_metadata_storage_new() {
1155        let storage = NacosMetadataStorage::new("http://127.0.0.1:8848");
1156        assert_eq!(storage.server_addr, "http://127.0.0.1:8848");
1157        assert_eq!(storage.namespace, "public");
1158        assert_eq!(storage.group, "dubbo");
1159        assert_eq!(storage.data_id_prefix, "dubbo.metadata.");
1160        assert!(storage.known_apps.is_empty());
1161    }
1162
1163    #[test]
1164    fn test_nacos_metadata_storage_builder_chaining() {
1165        let storage = NacosMetadataStorage::new("http://nacos:8848")
1166            .with_namespace("dev")
1167            .with_group("my-group")
1168            .with_data_id_prefix("custom.");
1169        assert_eq!(storage.namespace, "dev");
1170        assert_eq!(storage.group, "my-group");
1171        assert_eq!(storage.data_id_prefix, "custom.");
1172    }
1173
1174    #[test]
1175    fn test_nacos_metadata_storage_default_values() {
1176        let storage = NacosMetadataStorage::new("http://127.0.0.1:8848");
1177        assert_eq!(storage.namespace, "public");
1178        assert_eq!(storage.group, "dubbo");
1179        assert_eq!(storage.data_id_prefix, "dubbo.metadata.");
1180    }
1181
1182    #[test]
1183    fn test_nacos_metadata_storage_data_id_format() {
1184        let storage = NacosMetadataStorage::new("http://127.0.0.1:8848");
1185        assert_eq!(
1186            storage.data_id_for("demo-provider"),
1187            "dubbo.metadata.demo-provider"
1188        );
1189
1190        let custom =
1191            NacosMetadataStorage::new("http://127.0.0.1:8848").with_data_id_prefix("meta.");
1192        assert_eq!(custom.data_id_for("my-app"), "meta.my-app");
1193    }
1194
1195    #[test]
1196    fn test_nacos_metadata_store_serialization_roundtrip() {
1197        let meta = MetadataInfo::new("demo-provider")
1198            .with_revision(3)
1199            .with_service(
1200                ServiceDefinition::new("com.example.Greeter")
1201                    .with_version("1.0.0")
1202                    .with_method(MethodDefinition::new("sayHello", "Ljava/lang/String;")),
1203            )
1204            .with_attr("owner", "team-a");
1205
1206        let json = serde_json::to_string(&meta).unwrap();
1207        let parsed: MetadataInfo = serde_json::from_str(&json).unwrap();
1208        assert_eq!(parsed, meta);
1209    }
1210
1211    #[test]
1212    fn test_nacos_metadata_get_without_server_returns_none() {
1213        let storage = NacosMetadataStorage::new("http://127.0.0.1:1");
1214        let result = storage.get("any-app");
1215        assert!(result.is_none());
1216    }
1217
1218    #[test]
1219    fn test_nacos_metadata_remove_without_server_returns_none() {
1220        let storage = NacosMetadataStorage::new("http://127.0.0.1:1");
1221        let result = storage.remove("any-app");
1222        assert!(result.is_none());
1223    }
1224
1225    #[test]
1226    fn test_nacos_metadata_applications_cache() {
1227        let storage = NacosMetadataStorage::new("http://127.0.0.1:1");
1228
1229        storage.known_apps.insert("app-a".to_string(), ());
1230        storage.known_apps.insert("app-b".to_string(), ());
1231        storage.known_apps.insert("app-c".to_string(), ());
1232
1233        let mut apps = storage.applications();
1234        apps.sort();
1235        assert_eq!(apps, vec!["app-a", "app-b", "app-c"]);
1236
1237        storage.known_apps.remove("app-b");
1238        let mut apps = storage.applications();
1239        apps.sort();
1240        assert_eq!(apps, vec!["app-a", "app-c"]);
1241    }
1242
1243    #[test]
1244    fn test_nacos_metadata_store_caches_app_name() {
1245        let url = start_mock_http_server(200, "true");
1246        let storage = NacosMetadataStorage::new(&url);
1247
1248        let meta = MetadataInfo::new("demo-provider")
1249            .with_revision(1)
1250            .with_service(ServiceDefinition::new("com.example.Greeter"));
1251
1252        storage.store(meta);
1253
1254        let apps = storage.applications();
1255        assert_eq!(apps, vec!["demo-provider"]);
1256    }
1257
1258    #[test]
1259    fn test_nacos_metadata_remove_clears_cache() {
1260        let meta = MetadataInfo::new("demo-provider")
1261            .with_revision(1)
1262            .with_service(ServiceDefinition::new("com.example.Greeter"));
1263        let json = serde_json::to_string(&meta).unwrap();
1264
1265        let url = start_multi_mock_http_server(vec![(200, json), (200, "true".to_string())]);
1266
1267        let storage = NacosMetadataStorage::new(&url);
1268        storage.known_apps.insert("demo-provider".to_string(), ());
1269
1270        let removed = storage.remove("demo-provider");
1271        assert!(removed.is_some());
1272        assert_eq!(removed.unwrap().application, "demo-provider");
1273
1274        let apps = storage.applications();
1275        assert!(apps.is_empty());
1276    }
1277
1278    #[test]
1279    fn test_nacos_metadata_storage_with_namespace() {
1280        let storage = NacosMetadataStorage::new("http://nacos:8848").with_namespace("production");
1281        assert_eq!(storage.namespace, "production");
1282    }
1283
1284    #[test]
1285    fn test_nacos_metadata_storage_with_group() {
1286        let storage = NacosMetadataStorage::new("http://nacos:8848").with_group("custom-group");
1287        assert_eq!(storage.group, "custom-group");
1288    }
1289}