1use dashmap::DashMap;
2use serde::{Deserialize, Serialize};
3use std::sync::{Arc, RwLock};
4use std::time::Duration;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
8pub enum StreamType {
9 #[default]
11 Unary,
12 ClientStreaming,
14 ServerStreaming,
16 BidiStreaming,
18}
19
20#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25pub struct MethodDefinition {
26 pub name: String,
27 pub parameter_types: Vec<String>,
28 pub return_type: String,
29 pub stream_type: StreamType,
30 pub oneway: bool,
31}
32
33impl MethodDefinition {
34 #[must_use]
35 pub fn new(name: impl Into<String>, return_type: impl Into<String>) -> Self {
36 Self {
37 name: name.into(),
38 parameter_types: Vec::new(),
39 return_type: return_type.into(),
40 stream_type: StreamType::default(),
41 oneway: false,
42 }
43 }
44
45 #[must_use]
46 pub fn with_param(mut self, param_type: impl Into<String>) -> Self {
47 self.parameter_types.push(param_type.into());
48 self
49 }
50
51 #[must_use]
52 pub fn with_stream_type(mut self, stream_type: StreamType) -> Self {
53 self.stream_type = stream_type;
54 self
55 }
56
57 #[must_use]
58 pub fn with_oneway(mut self, oneway: bool) -> Self {
59 self.oneway = oneway;
60 self
61 }
62
63 #[must_use]
64 pub fn is_streaming(&self) -> bool {
65 self.stream_type != StreamType::Unary
66 }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
74pub struct ServiceDefinition {
75 pub interface: String,
77 pub version: String,
79 pub group: String,
81 pub methods: Vec<MethodDefinition>,
83 pub params: Vec<(String, String)>,
85}
86
87impl ServiceDefinition {
88 #[must_use]
89 pub fn new(interface: impl Into<String>) -> Self {
90 Self {
91 interface: interface.into(),
92 version: "1.0.0".into(),
93 group: String::new(),
94 methods: Vec::new(),
95 params: Vec::new(),
96 }
97 }
98
99 #[must_use]
100 pub fn with_version(mut self, version: impl Into<String>) -> Self {
101 self.version = version.into();
102 self
103 }
104
105 #[must_use]
106 pub fn with_group(mut self, group: impl Into<String>) -> Self {
107 self.group = group.into();
108 self
109 }
110
111 #[must_use]
112 pub fn with_method(mut self, method: MethodDefinition) -> Self {
113 self.methods.push(method);
114 self
115 }
116
117 #[must_use]
118 pub fn with_param(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
119 self.params.push((key.into(), value.into()));
120 self
121 }
122
123 #[must_use]
125 pub fn service_key(&self) -> String {
126 if self.group.is_empty() {
127 format!("{}:{}", self.interface, self.version)
128 } else {
129 format!("{}/{}:{}", self.group, self.interface, self.version)
130 }
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
140pub struct MetadataInfo {
141 pub application: String,
143 pub revision: u64,
145 pub services: Vec<ServiceDefinition>,
147 pub attributes: Vec<(String, String)>,
149}
150
151impl MetadataInfo {
152 #[must_use]
153 pub fn new(application: impl Into<String>) -> Self {
154 Self {
155 application: application.into(),
156 revision: 0,
157 services: Vec::new(),
158 attributes: Vec::new(),
159 }
160 }
161
162 #[must_use]
163 pub fn with_revision(mut self, revision: u64) -> Self {
164 self.revision = revision;
165 self
166 }
167
168 #[must_use]
169 pub fn with_service(mut self, service: ServiceDefinition) -> Self {
170 self.services.push(service);
171 self
172 }
173
174 #[must_use]
175 pub fn with_attr(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
176 self.attributes.push((key.into(), value.into()));
177 self
178 }
179
180 #[must_use]
182 pub fn find_service(&self, interface: &str) -> Option<&ServiceDefinition> {
183 self.services.iter().find(|s| s.interface == interface)
184 }
185
186 #[must_use]
188 pub fn bump_revision(mut self) -> Self {
189 self.revision += 1;
190 self
191 }
192}
193
194pub trait MetadataStorage: Send + Sync {
199 fn store(&self, metadata: MetadataInfo);
201
202 fn get(&self, application: &str) -> Option<MetadataInfo>;
204
205 fn remove(&self, application: &str) -> Option<MetadataInfo>;
207
208 fn applications(&self) -> Vec<String>;
210}
211
212pub struct InMemoryMetadataStorage {
217 store: DashMap<String, MetadataInfo>,
218}
219
220impl InMemoryMetadataStorage {
221 #[must_use]
222 pub fn new() -> Self {
223 Self {
224 store: DashMap::new(),
225 }
226 }
227}
228
229impl Default for InMemoryMetadataStorage {
230 fn default() -> Self {
231 Self::new()
232 }
233}
234
235impl MetadataStorage for InMemoryMetadataStorage {
236 fn store(&self, metadata: MetadataInfo) {
237 self.store.insert(metadata.application.clone(), metadata);
238 }
239
240 fn get(&self, application: &str) -> Option<MetadataInfo> {
241 self.store.get(application).map(|entry| entry.clone())
242 }
243
244 fn remove(&self, application: &str) -> Option<MetadataInfo> {
245 self.store.remove(application).map(|(_, v)| v)
246 }
247
248 fn applications(&self) -> Vec<String> {
249 self.store.iter().map(|entry| entry.key().clone()).collect()
250 }
251}
252
253struct NoopWatcher;
258
259impl zookeeper::Watcher for NoopWatcher {
260 fn handle(&self, _event: zookeeper::WatchedEvent) {}
261}
262
263pub struct ZkMetadataStorage {
272 zk_addr: String,
273 root_path: String,
274 zk: RwLock<Option<zookeeper::ZooKeeper>>,
275}
276
277impl ZkMetadataStorage {
278 #[must_use]
282 pub fn new(zk_addr: &str) -> Self {
283 Self {
284 zk_addr: zk_addr.to_string(),
285 root_path: "/dubbo/metadata".to_string(),
286 zk: RwLock::new(None),
287 }
288 }
289
290 #[must_use]
292 pub fn with_root_path(mut self, path: &str) -> Self {
293 self.root_path = path.to_string();
294 self
295 }
296
297 fn ensure_connection(&self) -> Result<(), String> {
299 {
300 let guard = self.zk.read().map_err(|e| e.to_string())?;
301 if guard.is_some() {
302 return Ok(());
303 }
304 }
305
306 let mut guard = self.zk.write().map_err(|e| e.to_string())?;
307 if guard.is_some() {
308 return Ok(());
309 }
310
311 let zk = zookeeper::ZooKeeper::connect(&self.zk_addr, Duration::from_secs(5), NoopWatcher)
312 .map_err(|e| format!("ZK connect error: {e}"))?;
313
314 *guard = Some(zk);
315 Ok(())
316 }
317
318 fn app_path(&self, app: &str) -> String {
320 format!("{}/{}", self.root_path, app)
321 }
322
323 fn ensure_path(&self, path: &str) -> Result<(), String> {
326 let guard = self.zk.read().map_err(|e| e.to_string())?;
327 let zk = guard.as_ref().ok_or("ZK not connected")?;
328
329 let parts: Vec<&str> = path.split('/').filter(|s| !s.is_empty()).collect();
330 let mut current = String::new();
331
332 for part in &parts {
333 current.push('/');
334 current.push_str(part);
335
336 if zk
337 .exists(¤t, false)
338 .map_err(|e| format!("ZK exists error for {current}: {e}"))?
339 .is_none()
340 {
341 zk.create(
342 ¤t,
343 Vec::new(),
344 zookeeper::Acl::open_unsafe().clone(),
345 zookeeper::CreateMode::Persistent,
346 )
347 .map_err(|e| format!("ZK create path error for {current}: {e}"))?;
348 }
349 }
350
351 Ok(())
352 }
353
354 fn with_zk<F, T>(&self, f: F) -> Result<T, String>
356 where
357 F: FnOnce(&zookeeper::ZooKeeper) -> Result<T, String>,
358 {
359 let guard = self.zk.read().map_err(|e| e.to_string())?;
360 let zk = guard.as_ref().ok_or("ZK not connected")?;
361 f(zk)
362 }
363}
364
365impl MetadataStorage for ZkMetadataStorage {
366 fn store(&self, metadata: MetadataInfo) {
367 if self.ensure_connection().is_err() {
368 return;
369 }
370
371 let path = self.app_path(&metadata.application);
372 if self.ensure_path(&path).is_err() {
373 return;
374 }
375
376 let Ok(json) = serde_json::to_vec(&metadata) else {
377 return;
378 };
379
380 let _ = self.with_zk(|zk| {
381 match zk.exists(&path, false) {
382 Ok(Some(_)) => {
383 zk.set_data(&path, json, None)
384 .map_err(|e| format!("ZK set_data error: {e}"))?;
385 }
386 Ok(None) => {
387 zk.create(
388 &path,
389 json,
390 zookeeper::Acl::open_unsafe().clone(),
391 zookeeper::CreateMode::Persistent,
392 )
393 .map_err(|e| format!("ZK create error: {e}"))?;
394 }
395 Err(e) => return Err(format!("ZK exists error: {e}")),
396 }
397 Ok(())
398 });
399 }
400
401 fn get(&self, application: &str) -> Option<MetadataInfo> {
402 if self.ensure_connection().is_err() {
403 return None;
404 }
405
406 let path = self.app_path(application);
407
408 self.with_zk(|zk| {
409 let (data, _) = zk
410 .get_data(&path, false)
411 .map_err(|e| format!("ZK get_data error: {e}"))?;
412 let metadata: MetadataInfo = serde_json::from_slice(&data)
413 .map_err(|e| format!("JSON deserialize error: {e}"))?;
414 Ok(metadata)
415 })
416 .ok()
417 }
418
419 fn remove(&self, application: &str) -> Option<MetadataInfo> {
420 let removed = self.get(application)?;
421 let path = self.app_path(application);
422
423 let _ = self.with_zk(|zk| {
424 zk.delete(&path, None)
425 .map_err(|e| format!("ZK delete error: {e}"))
426 });
427
428 Some(removed)
429 }
430
431 fn applications(&self) -> Vec<String> {
432 if self.ensure_connection().is_err() {
433 return Vec::new();
434 }
435
436 self.with_zk(|zk| {
437 zk.get_children(&self.root_path, false)
438 .map_err(|e| format!("ZK get_children error: {e}"))
439 })
440 .unwrap_or_default()
441 }
442}
443
444pub struct NacosMetadataStorage {
458 server_addr: String,
459 namespace: String,
460 group: String,
461 data_id_prefix: String,
462 client: reqwest::blocking::Client,
463 known_apps: DashMap<String, ()>,
464}
465
466impl NacosMetadataStorage {
467 #[must_use]
472 pub fn new(server_addr: &str) -> Self {
473 Self {
474 server_addr: server_addr.to_string(),
475 namespace: "public".to_string(),
476 group: "dubbo".to_string(),
477 data_id_prefix: "dubbo.metadata.".to_string(),
478 client: reqwest::blocking::Client::new(),
479 known_apps: DashMap::new(),
480 }
481 }
482
483 #[must_use]
485 pub fn with_namespace(mut self, ns: &str) -> Self {
486 self.namespace = ns.to_string();
487 self
488 }
489
490 #[must_use]
492 pub fn with_group(mut self, group: &str) -> Self {
493 self.group = group.to_string();
494 self
495 }
496
497 #[must_use]
502 pub fn with_data_id_prefix(mut self, prefix: &str) -> Self {
503 self.data_id_prefix = prefix.to_string();
504 self
505 }
506
507 #[must_use]
511 pub fn data_id_for(&self, app: &str) -> String {
512 format!("{}{}", self.data_id_prefix, app)
513 }
514}
515
516impl MetadataStorage for NacosMetadataStorage {
517 fn store(&self, metadata: MetadataInfo) {
518 let app = metadata.application.clone();
519 let data_id = self.data_id_for(&app);
520
521 let json = match serde_json::to_string(&metadata) {
522 Ok(data) => data,
523 Err(e) => {
524 tracing::warn!("Nacos store: JSON serialization failed for app '{app}': {e}");
525 return;
526 }
527 };
528
529 let url = format!("{}/nacos/v1/cs/configs", self.server_addr);
530 let result = self
531 .client
532 .post(&url)
533 .form(&[
534 ("dataId", data_id.as_str()),
535 ("group", self.group.as_str()),
536 ("content", json.as_str()),
537 ("namespaceId", self.namespace.as_str()),
538 ("type", "json"),
539 ])
540 .send();
541
542 match result {
543 Ok(resp) if resp.status().is_success() => {
544 self.known_apps.insert(app, ());
545 }
546 Ok(resp) => {
547 tracing::warn!(
548 "Nacos store: server returned status {} for app '{app}'",
549 resp.status()
550 );
551 }
552 Err(e) => {
553 tracing::warn!("Nacos store: HTTP request failed for app '{app}': {e}");
554 }
555 }
556 }
557
558 fn get(&self, application: &str) -> Option<MetadataInfo> {
559 let data_id = self.data_id_for(application);
560 let url = format!("{}/nacos/v1/cs/configs", self.server_addr);
561
562 let resp = self
563 .client
564 .get(&url)
565 .query(&[
566 ("dataId", data_id.as_str()),
567 ("group", self.group.as_str()),
568 ("namespaceId", self.namespace.as_str()),
569 ])
570 .send();
571
572 match resp {
573 Ok(resp) if resp.status().is_success() => match resp.text() {
574 Ok(text) if !text.is_empty() => serde_json::from_str::<MetadataInfo>(&text)
575 .map_err(|e| {
576 tracing::warn!(
577 "Nacos get: JSON deserialization failed for app '{application}': {e}"
578 );
579 e
580 })
581 .ok(),
582 _ => None,
583 },
584 Ok(resp) => {
585 tracing::warn!(
586 "Nacos get: server returned status {} for app '{application}'",
587 resp.status()
588 );
589 None
590 }
591 Err(e) => {
592 tracing::warn!("Nacos get: HTTP request failed for app '{application}': {e}");
593 None
594 }
595 }
596 }
597
598 fn remove(&self, application: &str) -> Option<MetadataInfo> {
599 let existing = self.get(application)?;
600
601 let data_id = self.data_id_for(application);
602 let url = format!("{}/nacos/v1/cs/configs", self.server_addr);
603
604 let result = self
605 .client
606 .delete(&url)
607 .query(&[
608 ("dataId", data_id.as_str()),
609 ("group", self.group.as_str()),
610 ("namespaceId", self.namespace.as_str()),
611 ])
612 .send();
613
614 match result {
615 Ok(resp) if resp.status().is_success() => {
616 self.known_apps.remove(application);
617 }
618 Ok(resp) => {
619 tracing::warn!(
620 "Nacos remove: server returned status {} for app '{application}'",
621 resp.status()
622 );
623 }
624 Err(e) => {
625 tracing::warn!("Nacos remove: HTTP request failed for app '{application}': {e}");
626 }
627 }
628
629 Some(existing)
630 }
631
632 fn applications(&self) -> Vec<String> {
633 self.known_apps.iter().map(|e| e.key().clone()).collect()
634 }
635}
636
637#[async_trait::async_trait]
643pub trait MetadataService: Send + Sync {
644 async fn get_metadata_info(&self, application: String) -> Option<MetadataInfo>;
646
647 async fn get_service_definition(&self, path: String) -> Option<String>;
649
650 async fn get_exported_service_urls(&self, application: String) -> Vec<String>;
652
653 async fn echo(&self, msg: String) -> String;
655}
656
657pub struct DefaultMetadataService {
659 storage: Arc<dyn MetadataStorage>,
660}
661
662impl DefaultMetadataService {
663 #[must_use]
664 pub fn new(storage: Arc<dyn MetadataStorage>) -> Self {
665 Self { storage }
666 }
667}
668
669#[async_trait::async_trait]
670impl MetadataService for DefaultMetadataService {
671 async fn get_metadata_info(&self, application: String) -> Option<MetadataInfo> {
672 self.storage.get(&application)
673 }
674
675 async fn get_service_definition(&self, path: String) -> Option<String> {
676 for app in self.storage.applications() {
677 if let Some(meta) = self.storage.get(&app) {
678 for svc in &meta.services {
679 if svc.interface == path {
680 return serde_json::to_string(svc).ok();
681 }
682 }
683 }
684 }
685 None
686 }
687
688 async fn get_exported_service_urls(&self, application: String) -> Vec<String> {
689 self.storage
690 .get(&application)
691 .map(|meta| {
692 meta.services
693 .iter()
694 .map(|svc| format!("{}:{}", svc.interface, svc.version))
695 .collect()
696 })
697 .unwrap_or_default()
698 }
699
700 async fn echo(&self, msg: String) -> String {
701 msg
702 }
703}
704
705#[cfg(test)]
706mod tests {
707 use super::*;
708
709 #[test]
710 fn test_method_definition_builder() {
711 let method = MethodDefinition::new("sayHello", "Ljava/lang/String;")
712 .with_param("Ljava/lang/String;")
713 .with_oneway(false);
714
715 assert_eq!(method.name, "sayHello");
716 assert_eq!(method.return_type, "Ljava/lang/String;");
717 assert_eq!(method.parameter_types, vec!["Ljava/lang/String;"]);
718 assert!(!method.oneway);
719 }
720
721 #[test]
722 fn test_method_definition_oneway() {
723 let method = MethodDefinition::new("notify", "V").with_oneway(true);
724
725 assert!(method.oneway);
726 assert_eq!(method.return_type, "V");
727 }
728
729 #[test]
730 fn test_service_definition_builder() {
731 let svc = ServiceDefinition::new("com.example.Greeter")
732 .with_version("1.0.0")
733 .with_group("default")
734 .with_method(
735 MethodDefinition::new("sayHello", "Ljava/lang/String;")
736 .with_param("Ljava/lang/String;"),
737 )
738 .with_method(MethodDefinition::new("echo", "Ljava/lang/String;"))
739 .with_param("timeout", "3000");
740
741 assert_eq!(svc.interface, "com.example.Greeter");
742 assert_eq!(svc.version, "1.0.0");
743 assert_eq!(svc.group, "default");
744 assert_eq!(svc.methods.len(), 2);
745 assert_eq!(svc.params.len(), 1);
746 }
747
748 #[test]
749 fn test_service_key_format() {
750 let svc = ServiceDefinition::new("com.example.Greeter").with_version("1.0.0");
751 assert_eq!(svc.service_key(), "com.example.Greeter:1.0.0");
752
753 let svc_grouped = ServiceDefinition::new("com.example.Greeter")
754 .with_version("1.0.0")
755 .with_group("dev");
756 assert_eq!(svc_grouped.service_key(), "dev/com.example.Greeter:1.0.0");
757 }
758
759 #[test]
760 fn test_metadata_info_builder() {
761 let meta = MetadataInfo::new("demo-provider")
762 .with_revision(3)
763 .with_service(
764 ServiceDefinition::new("com.example.Greeter")
765 .with_method(MethodDefinition::new("sayHello", "V")),
766 )
767 .with_attr("owner", "team-a");
768
769 assert_eq!(meta.application, "demo-provider");
770 assert_eq!(meta.revision, 3);
771 assert_eq!(meta.services.len(), 1);
772 assert_eq!(meta.attributes.len(), 1);
773 }
774
775 #[test]
776 fn test_metadata_info_find_service() {
777 let meta = MetadataInfo::new("demo-provider")
778 .with_service(ServiceDefinition::new("com.example.Greeter"))
779 .with_service(ServiceDefinition::new("com.example.UserService"));
780
781 assert!(meta.find_service("com.example.Greeter").is_some());
782 assert!(meta.find_service("com.example.UserService").is_some());
783 assert!(meta.find_service("com.example.Unknown").is_none());
784 }
785
786 #[test]
787 fn test_metadata_info_bump_revision() {
788 let meta = MetadataInfo::new("demo-provider")
789 .with_revision(0)
790 .bump_revision();
791
792 assert_eq!(meta.revision, 1);
793 }
794
795 #[test]
796 fn test_in_memory_storage_store_and_get() {
797 let storage = InMemoryMetadataStorage::new();
798
799 let meta = MetadataInfo::new("demo-provider")
800 .with_service(ServiceDefinition::new("com.example.Greeter"));
801
802 storage.store(meta.clone());
803
804 let retrieved = storage.get("demo-provider");
805 assert!(retrieved.is_some());
806 assert_eq!(retrieved.unwrap().application, "demo-provider");
807 }
808
809 #[test]
810 fn test_in_memory_storage_get_nonexistent() {
811 let storage = InMemoryMetadataStorage::new();
812 assert!(storage.get("unknown-app").is_none());
813 }
814
815 #[test]
816 fn test_in_memory_storage_remove() {
817 let storage = InMemoryMetadataStorage::new();
818 let meta = MetadataInfo::new("demo-provider");
819 storage.store(meta);
820
821 let removed = storage.remove("demo-provider");
822 assert!(removed.is_some());
823
824 assert!(storage.get("demo-provider").is_none());
825 }
826
827 #[test]
828 fn test_in_memory_storage_applications() {
829 let storage = InMemoryMetadataStorage::new();
830
831 storage.store(MetadataInfo::new("app-a"));
832 storage.store(MetadataInfo::new("app-b"));
833 storage.store(MetadataInfo::new("app-c"));
834
835 let mut apps = storage.applications();
836 apps.sort();
837 assert_eq!(apps, vec!["app-a", "app-b", "app-c"]);
838 }
839
840 #[test]
841 fn test_in_memory_storage_overwrite() {
842 let storage = InMemoryMetadataStorage::new();
843
844 let meta_v1 = MetadataInfo::new("demo-provider").with_revision(1);
845 storage.store(meta_v1);
846
847 let meta_v2 = MetadataInfo::new("demo-provider").with_revision(2);
848 storage.store(meta_v2);
849
850 let retrieved = storage.get("demo-provider").unwrap();
851 assert_eq!(retrieved.revision, 2);
852 }
853
854 #[test]
855 fn test_metadata_service_get_metadata_info() {
856 let storage = Arc::new(InMemoryMetadataStorage::new());
857 storage.store(
858 MetadataInfo::new("demo-provider")
859 .with_service(ServiceDefinition::new("com.example.Greeter")),
860 );
861
862 let service = DefaultMetadataService::new(storage);
863
864 let rt = tokio::runtime::Runtime::new().unwrap();
865 let result = rt.block_on(service.get_metadata_info("demo-provider".into()));
866 assert!(result.is_some());
867 assert_eq!(result.unwrap().services.len(), 1);
868 }
869
870 #[test]
871 fn test_metadata_service_echo() {
872 let storage = Arc::new(InMemoryMetadataStorage::new());
873 let service = DefaultMetadataService::new(storage);
874
875 let rt = tokio::runtime::Runtime::new().unwrap();
876 let result = rt.block_on(service.echo("ping".into()));
877 assert_eq!(result, "ping");
878 }
879
880 #[test]
881 fn test_metadata_service_get_missing_metadata() {
882 let storage = Arc::new(InMemoryMetadataStorage::new());
883 let service = DefaultMetadataService::new(storage);
884
885 let rt = tokio::runtime::Runtime::new().unwrap();
886 let result = rt.block_on(service.get_metadata_info("unknown".into()));
887 assert!(result.is_none());
888 }
889
890 #[test]
891 fn test_metadata_service_get_exported_urls() {
892 let storage = Arc::new(InMemoryMetadataStorage::new());
893 storage.store(
894 MetadataInfo::new("demo-provider")
895 .with_service(ServiceDefinition::new("com.example.Greeter").with_version("1.0.0"))
896 .with_service(
897 ServiceDefinition::new("com.example.UserService").with_version("2.0.0"),
898 ),
899 );
900
901 let service = DefaultMetadataService::new(storage);
902
903 let rt = tokio::runtime::Runtime::new().unwrap();
904 let urls = rt.block_on(service.get_exported_service_urls("demo-provider".into()));
905 assert_eq!(urls.len(), 2);
906 assert!(urls.contains(&"com.example.Greeter:1.0.0".to_string()));
907 assert!(urls.contains(&"com.example.UserService:2.0.0".to_string()));
908 }
909
910 #[test]
911 fn test_metadata_service_get_service_definition() {
912 let storage = Arc::new(InMemoryMetadataStorage::new());
913
914 let svc = ServiceDefinition::new("com.example.Greeter")
915 .with_version("1.0.0")
916 .with_method(MethodDefinition::new("sayHello", "Ljava/lang/String;"));
917 let iface = svc.interface.clone();
918
919 storage.store(MetadataInfo::new("demo-provider").with_service(svc));
920
921 let service = DefaultMetadataService::new(storage);
922
923 let rt = tokio::runtime::Runtime::new().unwrap();
924 let result = rt.block_on(service.get_service_definition(iface));
925 assert!(result.is_some());
926 let def_str = result.unwrap();
927 assert!(def_str.contains("com.example.Greeter"));
928 assert!(def_str.contains("1.0.0"));
929 }
930
931 #[test]
932 fn test_serde_roundtrip_metadata_info() {
933 let meta = MetadataInfo::new("demo-provider")
934 .with_revision(5)
935 .with_service(
936 ServiceDefinition::new("com.example.Greeter")
937 .with_method(MethodDefinition::new("sayHello", "V")),
938 );
939
940 let json = serde_json::to_string(&meta).unwrap();
941 let parsed: MetadataInfo = serde_json::from_str(&json).unwrap();
942
943 assert_eq!(parsed.application, meta.application);
944 assert_eq!(parsed.revision, meta.revision);
945 assert_eq!(parsed.services.len(), meta.services.len());
946 }
947
948 #[test]
949 fn test_service_definition_empty_group() {
950 let svc = ServiceDefinition::new("com.example.Foo").with_version("2.0");
951 assert_eq!(svc.group, "");
952 assert_eq!(svc.service_key(), "com.example.Foo:2.0");
953 }
954
955 #[test]
956 fn test_method_definition_empty_params() {
957 let method = MethodDefinition::new("ping", "V");
958 assert!(method.parameter_types.is_empty());
959 }
960
961 #[test]
962 fn test_stream_type_default_is_unary() {
963 let method = MethodDefinition::new("sayHello", "V");
964 assert_eq!(method.stream_type, StreamType::Unary);
965 assert!(!method.is_streaming());
966 }
967
968 #[test]
969 fn test_method_definition_streaming() {
970 let method =
971 MethodDefinition::new("upload", "V").with_stream_type(StreamType::ClientStreaming);
972 assert_eq!(method.stream_type, StreamType::ClientStreaming);
973 assert!(method.is_streaming());
974 }
975
976 #[test]
977 fn test_stream_type_all_variants() {
978 let variants = [
979 (StreamType::Unary, false),
980 (StreamType::ClientStreaming, true),
981 (StreamType::ServerStreaming, true),
982 (StreamType::BidiStreaming, true),
983 ];
984
985 for (st, expected) in variants {
986 let method = MethodDefinition::new("m", "V").with_stream_type(st);
987 assert_eq!(method.is_streaming(), expected);
988 }
989 }
990
991 #[test]
992 fn test_zk_metadata_storage_new() {
993 let storage = ZkMetadataStorage::new("127.0.0.1:2181");
994 assert_eq!(storage.zk_addr, "127.0.0.1:2181");
995 assert_eq!(storage.root_path, "/dubbo/metadata");
996 assert!(storage.zk.read().unwrap().is_none());
997 }
998
999 #[test]
1000 fn test_zk_metadata_storage_with_root_path() {
1001 let storage = ZkMetadataStorage::new("127.0.0.1:2181").with_root_path("/custom/metadata");
1002 assert_eq!(storage.root_path, "/custom/metadata");
1003 }
1004
1005 #[test]
1006 fn test_zk_metadata_storage_app_path() {
1007 let storage = ZkMetadataStorage::new("127.0.0.1:2181");
1008 assert_eq!(
1009 storage.app_path("demo-provider"),
1010 "/dubbo/metadata/demo-provider"
1011 );
1012
1013 let storage_custom = ZkMetadataStorage::new("127.0.0.1:2181").with_root_path("/myapp/meta");
1014 assert_eq!(storage_custom.app_path("my-app"), "/myapp/meta/my-app");
1015 }
1016
1017 #[test]
1018 fn test_zk_metadata_store_serialization() {
1019 let meta = MetadataInfo::new("demo-provider")
1020 .with_revision(3)
1021 .with_service(
1022 ServiceDefinition::new("com.example.Greeter")
1023 .with_version("1.0.0")
1024 .with_method(
1025 MethodDefinition::new("sayHello", "Ljava/lang/String;")
1026 .with_param("Ljava/lang/String;"),
1027 ),
1028 )
1029 .with_attr("owner", "team-a");
1030
1031 let json = serde_json::to_vec(&meta).unwrap();
1032 let parsed: MetadataInfo = serde_json::from_slice(&json).unwrap();
1033
1034 assert_eq!(parsed.application, "demo-provider");
1035 assert_eq!(parsed.revision, 3);
1036 assert_eq!(parsed.services.len(), 1);
1037 assert_eq!(parsed.services[0].interface, "com.example.Greeter");
1038 assert_eq!(parsed.attributes.len(), 1);
1039 }
1040
1041 #[test]
1042 fn test_zk_metadata_get_without_connection_returns_none() {
1043 let storage = ZkMetadataStorage::new("256.256.256.256:99999");
1044 let result = storage.get("any-app");
1045 assert!(result.is_none());
1046 }
1047
1048 #[test]
1049 fn test_zk_metadata_remove_without_connection_returns_none() {
1050 let storage = ZkMetadataStorage::new("256.256.256.256:99999");
1051 let result = storage.remove("any-app");
1052 assert!(result.is_none());
1053 }
1054
1055 #[test]
1056 fn test_in_memory_store_still_works() {
1057 let storage = InMemoryMetadataStorage::new();
1058 let meta = MetadataInfo::new("verify-app")
1059 .with_revision(42)
1060 .with_service(ServiceDefinition::new("com.example.VerifyService"));
1061
1062 storage.store(meta);
1063 let retrieved = storage.get("verify-app").unwrap();
1064 assert_eq!(retrieved.application, "verify-app");
1065 assert_eq!(retrieved.revision, 42);
1066
1067 let apps = storage.applications();
1068 assert_eq!(apps, vec!["verify-app"]);
1069
1070 let removed = storage.remove("verify-app").unwrap();
1071 assert_eq!(removed.application, "verify-app");
1072 assert!(storage.get("verify-app").is_none());
1073 }
1074
1075 #[test]
1076 fn test_metadata_info_json_roundtrip() {
1077 let meta = MetadataInfo::new("roundtrip-app")
1078 .with_revision(7)
1079 .with_service(
1080 ServiceDefinition::new("com.example.Svc")
1081 .with_version("2.0.0")
1082 .with_group("prod")
1083 .with_method(
1084 MethodDefinition::new("process", "V")
1085 .with_param("Ljava/lang/String;")
1086 .with_stream_type(StreamType::BidiStreaming),
1087 )
1088 .with_param("timeout", "5000"),
1089 )
1090 .with_attr("env", "production");
1091
1092 let json_bytes = serde_json::to_vec(&meta).unwrap();
1093 let deserialized: MetadataInfo = serde_json::from_slice(&json_bytes).unwrap();
1094
1095 assert_eq!(deserialized, meta);
1096 }
1097
1098 fn start_mock_http_server(status: u16, body: impl Into<String>) -> String {
1099 use std::io::{Read, Write};
1100 let body = body.into();
1101 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1102 let addr = listener.local_addr().unwrap();
1103
1104 std::thread::spawn(move || {
1105 for _ in 0..10 {
1106 match listener.accept() {
1107 Ok((mut stream, _)) => {
1108 let mut buf = vec![0u8; 65536];
1109 let _ = stream.read(&mut buf);
1110 let resp = format!(
1111 "HTTP/1.1 {status} OK\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n{body}",
1112 body.len()
1113 );
1114 let _ = stream.write_all(resp.as_bytes());
1115 let _ = stream.flush();
1116 }
1117 Err(_) => break,
1118 }
1119 }
1120 });
1121
1122 std::thread::sleep(std::time::Duration::from_millis(50));
1123 format!("http://{addr}")
1124 }
1125
1126 fn start_multi_mock_http_server(responses: Vec<(u16, String)>) -> String {
1127 use std::io::{Read, Write};
1128 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
1129 let addr = listener.local_addr().unwrap();
1130
1131 std::thread::spawn(move || {
1132 for (status, body) in responses {
1133 match listener.accept() {
1134 Ok((mut stream, _)) => {
1135 let mut buf = vec![0u8; 65536];
1136 let _ = stream.read(&mut buf);
1137 let resp = format!(
1138 "HTTP/1.1 {status} OK\r\nContent-Length: {}\r\nContent-Type: text/plain\r\nConnection: close\r\n\r\n{body}",
1139 body.len()
1140 );
1141 let _ = stream.write_all(resp.as_bytes());
1142 let _ = stream.flush();
1143 }
1144 Err(_) => break,
1145 }
1146 }
1147 });
1148
1149 std::thread::sleep(std::time::Duration::from_millis(50));
1150 format!("http://{addr}")
1151 }
1152
1153 #[test]
1154 fn test_nacos_metadata_storage_new() {
1155 let storage = NacosMetadataStorage::new("http://127.0.0.1:8848");
1156 assert_eq!(storage.server_addr, "http://127.0.0.1:8848");
1157 assert_eq!(storage.namespace, "public");
1158 assert_eq!(storage.group, "dubbo");
1159 assert_eq!(storage.data_id_prefix, "dubbo.metadata.");
1160 assert!(storage.known_apps.is_empty());
1161 }
1162
1163 #[test]
1164 fn test_nacos_metadata_storage_builder_chaining() {
1165 let storage = NacosMetadataStorage::new("http://nacos:8848")
1166 .with_namespace("dev")
1167 .with_group("my-group")
1168 .with_data_id_prefix("custom.");
1169 assert_eq!(storage.namespace, "dev");
1170 assert_eq!(storage.group, "my-group");
1171 assert_eq!(storage.data_id_prefix, "custom.");
1172 }
1173
1174 #[test]
1175 fn test_nacos_metadata_storage_default_values() {
1176 let storage = NacosMetadataStorage::new("http://127.0.0.1:8848");
1177 assert_eq!(storage.namespace, "public");
1178 assert_eq!(storage.group, "dubbo");
1179 assert_eq!(storage.data_id_prefix, "dubbo.metadata.");
1180 }
1181
1182 #[test]
1183 fn test_nacos_metadata_storage_data_id_format() {
1184 let storage = NacosMetadataStorage::new("http://127.0.0.1:8848");
1185 assert_eq!(
1186 storage.data_id_for("demo-provider"),
1187 "dubbo.metadata.demo-provider"
1188 );
1189
1190 let custom =
1191 NacosMetadataStorage::new("http://127.0.0.1:8848").with_data_id_prefix("meta.");
1192 assert_eq!(custom.data_id_for("my-app"), "meta.my-app");
1193 }
1194
1195 #[test]
1196 fn test_nacos_metadata_store_serialization_roundtrip() {
1197 let meta = MetadataInfo::new("demo-provider")
1198 .with_revision(3)
1199 .with_service(
1200 ServiceDefinition::new("com.example.Greeter")
1201 .with_version("1.0.0")
1202 .with_method(MethodDefinition::new("sayHello", "Ljava/lang/String;")),
1203 )
1204 .with_attr("owner", "team-a");
1205
1206 let json = serde_json::to_string(&meta).unwrap();
1207 let parsed: MetadataInfo = serde_json::from_str(&json).unwrap();
1208 assert_eq!(parsed, meta);
1209 }
1210
1211 #[test]
1212 fn test_nacos_metadata_get_without_server_returns_none() {
1213 let storage = NacosMetadataStorage::new("http://127.0.0.1:1");
1214 let result = storage.get("any-app");
1215 assert!(result.is_none());
1216 }
1217
1218 #[test]
1219 fn test_nacos_metadata_remove_without_server_returns_none() {
1220 let storage = NacosMetadataStorage::new("http://127.0.0.1:1");
1221 let result = storage.remove("any-app");
1222 assert!(result.is_none());
1223 }
1224
1225 #[test]
1226 fn test_nacos_metadata_applications_cache() {
1227 let storage = NacosMetadataStorage::new("http://127.0.0.1:1");
1228
1229 storage.known_apps.insert("app-a".to_string(), ());
1230 storage.known_apps.insert("app-b".to_string(), ());
1231 storage.known_apps.insert("app-c".to_string(), ());
1232
1233 let mut apps = storage.applications();
1234 apps.sort();
1235 assert_eq!(apps, vec!["app-a", "app-b", "app-c"]);
1236
1237 storage.known_apps.remove("app-b");
1238 let mut apps = storage.applications();
1239 apps.sort();
1240 assert_eq!(apps, vec!["app-a", "app-c"]);
1241 }
1242
1243 #[test]
1244 fn test_nacos_metadata_store_caches_app_name() {
1245 let url = start_mock_http_server(200, "true");
1246 let storage = NacosMetadataStorage::new(&url);
1247
1248 let meta = MetadataInfo::new("demo-provider")
1249 .with_revision(1)
1250 .with_service(ServiceDefinition::new("com.example.Greeter"));
1251
1252 storage.store(meta);
1253
1254 let apps = storage.applications();
1255 assert_eq!(apps, vec!["demo-provider"]);
1256 }
1257
1258 #[test]
1259 fn test_nacos_metadata_remove_clears_cache() {
1260 let meta = MetadataInfo::new("demo-provider")
1261 .with_revision(1)
1262 .with_service(ServiceDefinition::new("com.example.Greeter"));
1263 let json = serde_json::to_string(&meta).unwrap();
1264
1265 let url = start_multi_mock_http_server(vec![(200, json), (200, "true".to_string())]);
1266
1267 let storage = NacosMetadataStorage::new(&url);
1268 storage.known_apps.insert("demo-provider".to_string(), ());
1269
1270 let removed = storage.remove("demo-provider");
1271 assert!(removed.is_some());
1272 assert_eq!(removed.unwrap().application, "demo-provider");
1273
1274 let apps = storage.applications();
1275 assert!(apps.is_empty());
1276 }
1277
1278 #[test]
1279 fn test_nacos_metadata_storage_with_namespace() {
1280 let storage = NacosMetadataStorage::new("http://nacos:8848").with_namespace("production");
1281 assert_eq!(storage.namespace, "production");
1282 }
1283
1284 #[test]
1285 fn test_nacos_metadata_storage_with_group() {
1286 let storage = NacosMetadataStorage::new("http://nacos:8848").with_group("custom-group");
1287 assert_eq!(storage.group, "custom-group");
1288 }
1289}