1#![allow(
2 clippy::doc_markdown,
3 clippy::missing_panics_doc,
4 clippy::assigning_clones
5)]
6
7pub use dubbo_rs_common;
8pub use dubbo_rs_protocol;
9pub use dubbo_rs_registry;
10
11use std::sync::{Arc, RwLock};
12
13use async_trait::async_trait;
14use dubbo_rs_common::error::RPCError;
15use dubbo_rs_common::node::Node;
16use dubbo_rs_common::url::URL;
17use dubbo_rs_protocol::{InvocationContext, Invoker, RPCResult};
18use dubbo_rs_registry::{NotifyListener, ServiceEvent};
19
20#[async_trait]
25pub trait Directory: Send + Sync {
26 async fn list(&self, ctx: &InvocationContext) -> Result<Vec<Arc<dyn Invoker>>, RPCError>;
34
35 fn get_url(&self) -> &URL;
37}
38
39#[async_trait]
46pub trait Cluster: Send + Sync {
47 async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError>;
53}
54
55pub struct StaticDirectory {
60 url: URL,
61 invokers: RwLock<Vec<Arc<dyn Invoker>>>,
62}
63
64impl StaticDirectory {
65 #[must_use]
66 pub fn new(url: URL) -> Self {
67 Self {
68 url,
69 invokers: RwLock::new(Vec::new()),
70 }
71 }
72
73 pub fn add_invoker(&self, invoker: Arc<dyn Invoker>) {
74 self.invokers.write().unwrap().push(invoker);
75 }
76
77 #[must_use]
78 pub fn invoker_count(&self) -> usize {
79 self.invokers.read().unwrap().len()
80 }
81}
82
83#[async_trait]
84impl Directory for StaticDirectory {
85 async fn list(&self, _ctx: &InvocationContext) -> Result<Vec<Arc<dyn Invoker>>, RPCError> {
86 let invokers = self.invokers.read().unwrap();
87 if invokers.is_empty() {
88 return Err(RPCError::ServiceNotFound(format!(
89 "no invokers available for {}",
90 self.url.path
91 )));
92 }
93
94 Ok(invokers
95 .iter()
96 .filter(|i| i.is_available())
97 .map(Arc::clone)
98 .collect())
99 }
100
101 fn get_url(&self) -> &URL {
102 &self.url
103 }
104}
105
106type InvokerFactory = Box<dyn Fn(&URL) -> Result<Box<dyn Invoker>, RPCError> + Send + Sync>;
107
108pub struct RegistryDirectory {
109 service_url: URL,
110 invokers: RwLock<Vec<Arc<dyn Invoker>>>,
111 provider_urls: RwLock<Vec<URL>>,
112 invoker_factory: Option<InvokerFactory>,
113}
114
115impl RegistryDirectory {
116 #[must_use]
117 pub fn new(service_url: URL) -> Self {
118 Self {
119 service_url,
120 invokers: RwLock::new(Vec::new()),
121 provider_urls: RwLock::new(Vec::new()),
122 invoker_factory: None,
123 }
124 }
125
126 #[must_use]
127 pub fn with_invoker_factory<F>(mut self, factory: F) -> Self
128 where
129 F: Fn(&URL) -> Result<Box<dyn Invoker>, RPCError> + Send + Sync + 'static,
130 {
131 self.invoker_factory = Some(Box::new(factory));
132 self
133 }
134
135 pub fn refresh_invokers(&self, provider_urls: &[URL]) {
137 let mut invokers: Vec<Arc<dyn Invoker>> = Vec::new();
138 for url in provider_urls {
139 if let Some(ref factory) = self.invoker_factory {
140 match factory(url) {
141 Ok(inv) => invokers.push(Arc::from(inv)),
142 Err(e) => {
143 tracing::warn!("failed to create invoker for {}: {e}", url.get_address());
144 }
145 }
146 } else {
147 invokers.push(Arc::new(ProviderInvoker {
148 provider_url: url.clone(),
149 }));
150 }
151 }
152 let mut guard = self.invokers.write().unwrap();
153 *guard = invokers;
154
155 let mut urls = self.provider_urls.write().unwrap();
156 *urls = provider_urls.to_vec();
157 }
158
159 #[must_use]
160 pub fn invoker_count(&self) -> usize {
161 self.invokers.read().unwrap().len()
162 }
163}
164
165#[async_trait]
166impl Directory for RegistryDirectory {
167 async fn list(&self, _ctx: &InvocationContext) -> Result<Vec<Arc<dyn Invoker>>, RPCError> {
168 let invokers = self.invokers.read().unwrap();
169 if invokers.is_empty() {
170 return Err(RPCError::ServiceNotFound(format!(
171 "no providers registered for {}",
172 self.service_url.path
173 )));
174 }
175
176 Ok(invokers
177 .iter()
178 .filter(|i| i.is_available())
179 .map(Arc::clone)
180 .collect())
181 }
182
183 fn get_url(&self) -> &URL {
184 &self.service_url
185 }
186}
187
188#[async_trait]
189impl NotifyListener for RegistryDirectory {
190 async fn notify(&self, event: ServiceEvent) {
191 match event {
192 ServiceEvent::Add(urls) | ServiceEvent::Update(urls) => {
193 let mut all = self.provider_urls.read().unwrap().clone();
194 for url in &urls {
195 if !all.iter().any(|u| u.get_address() == url.get_address()) {
196 all.push(url.clone());
197 }
198 }
199 self.refresh_invokers(&all);
200 }
201 ServiceEvent::Remove(urls) => {
202 let all: Vec<URL> = self
203 .provider_urls
204 .read()
205 .unwrap()
206 .iter()
207 .filter(|u| !urls.iter().any(|r| r.get_address() == u.get_address()))
208 .cloned()
209 .collect();
210 self.refresh_invokers(&all);
211 }
212 }
213 }
214
215 fn listen_url(&self) -> URL {
216 self.service_url.clone()
217 }
218}
219
220struct ProviderInvoker {
225 provider_url: URL,
226}
227
228impl Node for ProviderInvoker {
229 fn get_url(&self) -> &URL {
230 &self.provider_url
231 }
232
233 fn is_available(&self) -> bool {
234 true
235 }
236
237 fn destroy(&self) {}
238}
239
240#[async_trait]
241impl Invoker for ProviderInvoker {
242 async fn invoke(&self, _ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
243 Err(anyhow::anyhow!(
244 "ProviderInvoker for {} has no protocol invoker — \
245 configure an invoker factory via RegistryDirectory::with_invoker_factory()",
246 self.provider_url.get_address()
247 ))
248 }
249}
250
251pub struct FailoverCluster {
252 retries: u32,
253}
254
255impl FailoverCluster {
256 #[must_use]
257 pub fn new() -> Self {
258 Self { retries: 2 }
259 }
260
261 #[must_use]
262 pub fn with_retries(mut self, retries: u32) -> Self {
263 self.retries = retries;
264 self
265 }
266}
267
268impl Default for FailoverCluster {
269 fn default() -> Self {
270 Self::new()
271 }
272}
273
274#[async_trait]
275impl Cluster for FailoverCluster {
276 async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
277 Ok(Box::new(FailoverClusterInvoker {
278 directory,
279 retries: self.retries,
280 }))
281 }
282}
283
284struct FailoverClusterInvoker {
285 directory: Box<dyn Directory>,
286 retries: u32,
287}
288
289impl Node for FailoverClusterInvoker {
290 fn get_url(&self) -> &URL {
291 self.directory.get_url()
292 }
293
294 fn is_available(&self) -> bool {
295 true
296 }
297
298 fn destroy(&self) {}
299}
300
301#[async_trait]
302impl Invoker for FailoverClusterInvoker {
303 async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
304 let mut last_error: Option<RPCError> = None;
305
306 for attempt in 0..=self.retries {
307 let invokers = self
308 .directory
309 .list(ctx)
310 .await
311 .map_err(|e| anyhow::anyhow!("{e}"))?;
312
313 if invokers.is_empty() {
314 return Err(anyhow::anyhow!("no invokers available"));
315 }
316
317 for invoker in &invokers {
318 match invoker.invoke(ctx).await {
319 Ok(result) if !result.is_error() => return Ok(result),
320 Ok(result) => {
321 last_error = result.error.clone();
322 tracing::warn!(
323 "failover: attempt {}/{} failed with error {:?}",
324 attempt + 1,
325 self.retries + 1,
326 last_error
327 );
328 }
329 Err(e) => {
330 last_error = Some(RPCError::ServerError(format!("{e}")));
331 tracing::warn!(
332 "failover: attempt {}/{} failed: {}",
333 attempt + 1,
334 self.retries + 1,
335 e
336 );
337 }
338 }
339 }
340 }
341
342 Err(anyhow::anyhow!(
343 "failover: all {} attempts failed. last error: {:?}",
344 self.retries + 1,
345 last_error
346 ))
347 }
348}
349
350pub struct FailfastCluster;
351
352impl Default for FailfastCluster {
353 fn default() -> Self {
354 Self
355 }
356}
357
358#[async_trait]
359impl Cluster for FailfastCluster {
360 async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
361 Ok(Box::new(FailfastClusterInvoker { directory }))
362 }
363}
364
365struct FailfastClusterInvoker {
366 directory: Box<dyn Directory>,
367}
368
369impl Node for FailfastClusterInvoker {
370 fn get_url(&self) -> &URL {
371 self.directory.get_url()
372 }
373
374 fn is_available(&self) -> bool {
375 true
376 }
377
378 fn destroy(&self) {}
379}
380
381#[async_trait]
382impl Invoker for FailfastClusterInvoker {
383 async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
384 let invokers = self
385 .directory
386 .list(ctx)
387 .await
388 .map_err(|e| anyhow::anyhow!("{e}"))?;
389
390 if invokers.is_empty() {
391 return Err(anyhow::anyhow!("no invokers available"));
392 }
393
394 invokers[0].invoke(ctx).await
395 }
396}
397
398pub struct FailsafeCluster;
408
409impl Default for FailsafeCluster {
410 fn default() -> Self {
411 Self
412 }
413}
414
415#[async_trait]
416impl Cluster for FailsafeCluster {
417 async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
418 Ok(Box::new(FailsafeClusterInvoker { directory }))
419 }
420}
421
422struct FailsafeClusterInvoker {
423 directory: Box<dyn Directory>,
424}
425
426impl Node for FailsafeClusterInvoker {
427 fn get_url(&self) -> &URL {
428 self.directory.get_url()
429 }
430
431 fn is_available(&self) -> bool {
432 true
433 }
434
435 fn destroy(&self) {}
436}
437
438#[async_trait]
439impl Invoker for FailsafeClusterInvoker {
440 async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
441 let invokers = match self.directory.list(ctx).await {
442 Ok(invokers) => invokers,
443 Err(e) => {
444 tracing::warn!("failsafe: failed to list invokers: {e}");
445 return Ok(RPCResult::success(vec![]));
446 }
447 };
448
449 if invokers.is_empty() {
450 tracing::warn!("failsafe: no invokers available");
451 return Ok(RPCResult::success(vec![]));
452 }
453
454 match invokers[0].invoke(ctx).await {
455 Ok(result) if !result.is_error() => Ok(result),
456 Ok(result) => {
457 tracing::warn!(
458 "failsafe: invocation failed with error {:?}, swallowing",
459 result.error
460 );
461 Ok(RPCResult::success(vec![]))
462 }
463 Err(e) => {
464 tracing::warn!("failsafe: invocation error: {e}, swallowing");
465 Ok(RPCResult::success(vec![]))
466 }
467 }
468 }
469}
470
471#[allow(dead_code)]
477struct PendingInvocation {
478 method_name: String,
479 arguments: Vec<Vec<u8>>,
480 parameter_types: Vec<String>,
481 retry_count: u32,
482}
483
484pub struct FailbackCluster {
491 retry_delay: std::time::Duration,
492 max_retries: u32,
493}
494
495impl FailbackCluster {
496 #[must_use]
497 pub fn new() -> Self {
498 Self {
499 retry_delay: std::time::Duration::from_secs(5),
500 max_retries: 3,
501 }
502 }
503
504 #[must_use]
505 pub fn with_retry_delay(mut self, delay: std::time::Duration) -> Self {
506 self.retry_delay = delay;
507 self
508 }
509
510 #[must_use]
511 pub fn with_max_retries(mut self, retries: u32) -> Self {
512 self.max_retries = retries;
513 self
514 }
515}
516
517impl Default for FailbackCluster {
518 fn default() -> Self {
519 Self::new()
520 }
521}
522
523#[async_trait]
524impl Cluster for FailbackCluster {
525 async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
526 Ok(Box::new(FailbackClusterInvoker {
527 directory,
528 pending: RwLock::new(Vec::new()),
529 retry_delay: self.retry_delay,
530 max_retries: self.max_retries,
531 }))
532 }
533}
534
535struct FailbackClusterInvoker {
536 directory: Box<dyn Directory>,
537 pending: RwLock<Vec<PendingInvocation>>,
538 retry_delay: std::time::Duration,
539 max_retries: u32,
540}
541
542impl Node for FailbackClusterInvoker {
543 fn get_url(&self) -> &URL {
544 self.directory.get_url()
545 }
546
547 fn is_available(&self) -> bool {
548 true
549 }
550
551 fn destroy(&self) {}
552}
553
554#[async_trait]
555impl Invoker for FailbackClusterInvoker {
556 async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
557 let invokers = match self.directory.list(ctx).await {
558 Ok(invokers) => invokers,
559 Err(e) => {
560 tracing::warn!("failback: failed to list invokers: {e}");
561 self.record_pending(ctx);
562 return Ok(RPCResult::success(vec![]));
563 }
564 };
565
566 if invokers.is_empty() {
567 tracing::warn!("failback: no invokers available");
568 self.record_pending(ctx);
569 return Ok(RPCResult::success(vec![]));
570 }
571
572 match invokers[0].invoke(ctx).await {
573 Ok(result) if !result.is_error() => Ok(result),
574 Ok(result) => {
575 tracing::warn!(
576 "failback: invocation failed with error {:?}, recording for retry",
577 result.error
578 );
579 self.record_pending(ctx);
580 Ok(RPCResult::success(vec![]))
581 }
582 Err(e) => {
583 tracing::warn!("failback: invocation error: {e}, recording for retry");
584 self.record_pending(ctx);
585 Ok(RPCResult::success(vec![]))
586 }
587 }
588 }
589}
590
591impl FailbackClusterInvoker {
592 fn record_pending(&self, ctx: &InvocationContext) {
593 let pending = PendingInvocation {
594 method_name: ctx.method_name.clone(),
595 arguments: ctx.arguments.clone(),
596 parameter_types: ctx.parameter_types.clone(),
597 retry_count: 0,
598 };
599
600 let retry_delay = self.retry_delay;
601 let max_retries = self.max_retries;
602 let method_name = ctx.method_name.clone();
603
604 {
605 let mut queue = self.pending.write().unwrap();
606 queue.push(pending);
607 }
608
609 tracing::warn!(
610 "failback: scheduling retry in {:?} for method '{}'",
611 retry_delay,
612 method_name
613 );
614 tokio::spawn(async move {
615 tokio::time::sleep(retry_delay).await;
616 tracing::warn!(
617 "failback: retrying method '{}' (would attempt up to {} retries)",
618 method_name,
619 max_retries
620 );
621 });
622 }
623}
624
625pub struct ForkingCluster {
635 forks: usize,
636}
637
638impl ForkingCluster {
639 #[must_use]
640 pub fn new() -> Self {
641 Self { forks: 2 }
642 }
643
644 #[must_use]
645 pub fn with_forks(mut self, forks: usize) -> Self {
646 self.forks = if forks == 0 { 1 } else { forks };
647 self
648 }
649}
650
651impl Default for ForkingCluster {
652 fn default() -> Self {
653 Self::new()
654 }
655}
656
657#[async_trait]
658impl Cluster for ForkingCluster {
659 async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
660 Ok(Box::new(ForkingClusterInvoker {
661 directory,
662 forks: self.forks,
663 }))
664 }
665}
666
667struct ForkingClusterInvoker {
668 directory: Box<dyn Directory>,
669 forks: usize,
670}
671
672impl Node for ForkingClusterInvoker {
673 fn get_url(&self) -> &URL {
674 self.directory.get_url()
675 }
676
677 fn is_available(&self) -> bool {
678 true
679 }
680
681 fn destroy(&self) {}
682}
683
684#[async_trait]
685impl Invoker for ForkingClusterInvoker {
686 async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
687 let invokers = self
688 .directory
689 .list(ctx)
690 .await
691 .map_err(|e| anyhow::anyhow!("{e}"))?;
692
693 if invokers.is_empty() {
694 return Err(anyhow::anyhow!("no invokers available"));
695 }
696
697 let selected: Vec<Arc<dyn Invoker>> = invokers.into_iter().take(self.forks).collect();
698
699 if selected.len() == 1 {
700 return selected[0].invoke(ctx).await;
701 }
702
703 let (tx, mut rx) =
704 tokio::sync::mpsc::channel::<(usize, Result<RPCResult, anyhow::Error>)>(selected.len());
705
706 for (i, invoker) in selected.into_iter().enumerate() {
707 let tx = tx.clone();
708 let mut fork_ctx = ctx.clone();
709 tokio::spawn(async move {
710 let result = invoker.invoke(&mut fork_ctx).await;
711 let _ = tx.send((i, result)).await;
712 });
713 }
714 drop(tx);
715
716 let total = {
717 let mut count = 0usize;
718 let mut last_error: Option<anyhow::Error> = None;
719 while let Some((_idx, result)) = rx.recv().await {
720 count += 1;
721 match result {
722 Ok(r) if !r.is_error() => return Ok(r),
723 Ok(r) => {
724 last_error = Some(anyhow::anyhow!("{:?}", r.error));
725 }
726 Err(e) => {
727 last_error = Some(e);
728 }
729 }
730 if count >= self.forks {
731 break;
732 }
733 }
734 last_error
735 };
736
737 Err(total.unwrap_or_else(|| anyhow::anyhow!("forking: all forks failed")))
738 }
739}
740
741pub struct BroadcastCluster;
753
754impl Default for BroadcastCluster {
755 fn default() -> Self {
756 Self
757 }
758}
759
760#[async_trait]
761impl Cluster for BroadcastCluster {
762 async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
763 Ok(Box::new(BroadcastClusterInvoker { directory }))
764 }
765}
766
767struct BroadcastClusterInvoker {
768 directory: Box<dyn Directory>,
769}
770
771impl Node for BroadcastClusterInvoker {
772 fn get_url(&self) -> &URL {
773 self.directory.get_url()
774 }
775
776 fn is_available(&self) -> bool {
777 true
778 }
779
780 fn destroy(&self) {}
781}
782
783#[async_trait]
784impl Invoker for BroadcastClusterInvoker {
785 async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
786 let invokers = self
787 .directory
788 .list(ctx)
789 .await
790 .map_err(|e| anyhow::anyhow!("{e}"))?;
791
792 if invokers.is_empty() {
793 return Err(anyhow::anyhow!("no invokers available"));
794 }
795
796 let mut first_error: Option<RPCError> = None;
797 let mut last_result: Option<RPCResult> = None;
798
799 for invoker in &invokers {
800 match invoker.invoke(ctx).await {
801 Ok(result) if !result.is_error() => {
802 last_result = Some(result);
803 }
804 Ok(result) => {
805 tracing::warn!(
806 "broadcast: invoker {} returned error {:?}",
807 invoker.get_url().get_address(),
808 result.error
809 );
810 if first_error.is_none() {
811 first_error = result.error.clone();
812 }
813 }
814 Err(e) => {
815 tracing::warn!(
816 "broadcast: invoker {} failed: {e}",
817 invoker.get_url().get_address()
818 );
819 if first_error.is_none() {
820 first_error = Some(RPCError::ServerError(format!("{e}")));
821 }
822 }
823 }
824 }
825
826 if let Some(err) = first_error {
827 return Err(anyhow::anyhow!(
828 "broadcast: one or more invokers failed, first error: {err:?}"
829 ));
830 }
831
832 last_result.ok_or_else(|| anyhow::anyhow!("broadcast: no results returned"))
833 }
834}
835
836pub struct AvailableCluster;
848
849impl Default for AvailableCluster {
850 fn default() -> Self {
851 Self
852 }
853}
854
855#[async_trait]
856impl Cluster for AvailableCluster {
857 async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
858 Ok(Box::new(AvailableClusterInvoker { directory }))
859 }
860}
861
862struct AvailableClusterInvoker {
863 directory: Box<dyn Directory>,
864}
865
866impl Node for AvailableClusterInvoker {
867 fn get_url(&self) -> &URL {
868 self.directory.get_url()
869 }
870
871 fn is_available(&self) -> bool {
872 true
873 }
874
875 fn destroy(&self) {}
876}
877
878#[async_trait]
879impl Invoker for AvailableClusterInvoker {
880 async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
881 let invokers = self
882 .directory
883 .list(ctx)
884 .await
885 .map_err(|e| anyhow::anyhow!("{e}"))?;
886
887 for invoker in &invokers {
888 if invoker.is_available() {
889 return invoker.invoke(ctx).await;
890 }
891 }
892
893 Err(anyhow::anyhow!(
894 "available: no available provider found among {} invokers",
895 invokers.len()
896 ))
897 }
898}
899
900pub struct MockCluster {
918 #[allow(dead_code)]
919 force: bool,
920 #[allow(dead_code)]
921 mock_result: Option<Vec<u8>>,
922}
923
924impl MockCluster {
925 #[must_use]
926 pub fn new() -> Self {
927 Self {
928 force: false,
929 mock_result: None,
930 }
931 }
932
933 #[must_use]
934 pub fn with_force(mut self, force: bool) -> Self {
935 self.force = force;
936 self
937 }
938
939 #[must_use]
940 pub fn with_mock_result(mut self, result: Vec<u8>) -> Self {
941 self.mock_result = Some(result);
942 self
943 }
944}
945
946impl Default for MockCluster {
947 fn default() -> Self {
948 Self::new()
949 }
950}
951
952#[async_trait]
953impl Cluster for MockCluster {
954 async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
955 Ok(Box::new(MockClusterInvoker {
956 directory,
957 force: self.force,
958 mock_result: self.mock_result.clone(),
959 }))
960 }
961}
962
963struct MockClusterInvoker {
964 directory: Box<dyn Directory>,
965 #[allow(dead_code)]
966 force: bool,
967 #[allow(dead_code)]
968 mock_result: Option<Vec<u8>>,
969}
970
971impl Node for MockClusterInvoker {
972 fn get_url(&self) -> &URL {
973 self.directory.get_url()
974 }
975
976 fn is_available(&self) -> bool {
977 true
978 }
979
980 fn destroy(&self) {}
981}
982
983#[async_trait]
984impl Invoker for MockClusterInvoker {
985 async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
986 let attachment_force = ctx.attachments.get("mock").is_some_and(|v| v == "force");
988 let attachment_fail = ctx.attachments.get("mock").is_some_and(|v| v == "fail");
989 let attachment_mock_result = ctx
990 .attachments
991 .get("mock.result")
992 .map(|v| v.as_bytes().to_vec());
993
994 let force = attachment_force || (!attachment_fail && self.force);
995 let mock_result = attachment_mock_result.or_else(|| self.mock_result.clone());
996
997 if force {
998 return Ok(RPCResult::success(mock_result.unwrap_or_default()));
1000 }
1001
1002 let invokers = match self.directory.list(ctx).await {
1003 Ok(invokers) if !invokers.is_empty() => invokers,
1004 _ => return Ok(RPCResult::success(mock_result.unwrap_or_default())),
1005 };
1006
1007 match invokers[0].invoke(ctx).await {
1008 Ok(result) if !result.is_error() => Ok(result),
1009 Ok(err_result) => match mock_result {
1010 Some(data) => Ok(RPCResult::success(data)),
1011 None => Ok(err_result),
1012 },
1013 Err(e) => match mock_result {
1014 Some(data) => Ok(RPCResult::success(data)),
1015 None => Err(e),
1016 },
1017 }
1018 }
1019}
1020
1021pub struct ConditionRouter {
1032 match_rules: Vec<(String, String)>,
1033 filter_rules: Vec<(String, String)>,
1034}
1035
1036impl ConditionRouter {
1037 #[must_use]
1040 pub fn parse(rule: &str) -> Option<Self> {
1041 let parts: Vec<&str> = rule.splitn(2, "=>").collect();
1042 let lhs = parts[0].trim();
1043 let rhs = parts.get(1).map_or("", |s| s.trim());
1044
1045 if parts.len() == 1 {
1047 return Some(Self {
1048 match_rules: Vec::new(),
1049 filter_rules: Self::parse_kv_pairs(lhs),
1050 });
1051 }
1052
1053 Some(Self {
1054 match_rules: Self::parse_kv_pairs(lhs),
1055 filter_rules: Self::parse_kv_pairs(rhs),
1056 })
1057 }
1058
1059 fn parse_kv_pairs(s: &str) -> Vec<(String, String)> {
1060 if s.is_empty() {
1061 return Vec::new();
1062 }
1063 s.split(',')
1064 .filter_map(|kv| {
1065 let mut it = kv.splitn(2, '=');
1066 let k = it.next()?.trim();
1067 let v = it.next()?.trim();
1068 if k.is_empty() {
1069 None
1070 } else {
1071 Some((k.to_string(), v.to_string()))
1072 }
1073 })
1074 .collect()
1075 }
1076
1077 #[must_use]
1079 pub fn matches_invocation(&self, ctx: &InvocationContext) -> bool {
1080 if self.match_rules.is_empty() {
1081 return true;
1082 }
1083 self.match_rules
1084 .iter()
1085 .all(|(k, v)| ctx.attachments.get(k).is_some_and(|val| val == v))
1086 }
1087
1088 #[must_use]
1090 pub fn filter_invokers(&self, invokers: &[Arc<dyn Invoker>]) -> Vec<usize> {
1091 invokers
1092 .iter()
1093 .enumerate()
1094 .filter(|(_, inv)| {
1095 self.filter_rules
1096 .iter()
1097 .all(|(k, v)| inv.get_url().get_param(k).is_some_and(|val| val == v))
1098 })
1099 .map(|(i, _)| i)
1100 .collect()
1101 }
1102}
1103
1104pub struct TagRouter {
1110 tag_key: String,
1111}
1112
1113impl TagRouter {
1114 #[must_use]
1115 pub fn new() -> Self {
1116 Self {
1117 tag_key: "dubbo.tag".to_string(),
1118 }
1119 }
1120
1121 #[must_use]
1122 pub fn with_tag_key(mut self, key: impl Into<String>) -> Self {
1123 self.tag_key = key.into();
1124 self
1125 }
1126
1127 #[must_use]
1134 pub fn route(&self, invokers: &[Arc<dyn Invoker>], ctx: &InvocationContext) -> Vec<usize> {
1135 let requested_tag = ctx.attachments.get(&self.tag_key);
1136
1137 let Some(tag) = requested_tag else {
1138 return (0..invokers.len()).collect();
1139 };
1140
1141 let tagged: Vec<usize> = invokers
1142 .iter()
1143 .enumerate()
1144 .filter(|(_, inv)| inv.get_url().get_param("tag").is_some_and(|t| t == tag))
1145 .map(|(i, _)| i)
1146 .collect();
1147
1148 if !tagged.is_empty() {
1149 return tagged;
1150 }
1151
1152 invokers
1153 .iter()
1154 .enumerate()
1155 .filter(|(_, inv)| inv.get_url().get_param("tag").is_none())
1156 .map(|(i, _)| i)
1157 .collect()
1158 }
1159}
1160
1161impl Default for TagRouter {
1162 fn default() -> Self {
1163 Self::new()
1164 }
1165}
1166
1167pub struct ScriptRouter {
1188 script: String,
1189 compiled: rhai::AST,
1190}
1191
1192impl ScriptRouter {
1193 pub fn new(script: &str) -> Result<Self, Box<rhai::EvalAltResult>> {
1199 let mut engine = rhai::Engine::new();
1200 engine.set_max_expr_depths(64, 64);
1201 engine.set_max_operations(1000);
1202 engine.set_max_string_size(1024);
1203 engine.set_max_array_size(256);
1204
1205 let compiled = engine.compile(script)?;
1206 Ok(Self {
1207 script: script.to_string(),
1208 compiled,
1209 })
1210 }
1211
1212 #[must_use]
1213 #[allow(
1214 clippy::cast_possible_wrap,
1215 clippy::cast_possible_truncation,
1216 clippy::cast_sign_loss
1217 )]
1218 pub fn route(&self, invokers: &[Arc<dyn Invoker>], ctx: &InvocationContext) -> Vec<usize> {
1219 let mut engine = rhai::Engine::new();
1220 engine.set_max_expr_depths(64, 64);
1221 engine.set_max_operations(1000);
1222 engine.set_max_string_size(1024);
1223 engine.set_max_array_size(256);
1224
1225 let invokers_len = invokers.len();
1226 let invoker_arcs: Vec<Arc<dyn Invoker>> = invokers.to_vec();
1227 let method = ctx.method_name.clone();
1228 let attachments: std::collections::HashMap<String, String> = ctx.attachments.clone();
1229
1230 engine.register_fn("invoker_count", move || -> i64 { invokers_len as i64 });
1231
1232 let arcs = invoker_arcs.clone();
1233 engine.register_fn("invoker_ip", move |index: i64| -> String {
1234 let idx = index as usize;
1235 arcs.get(idx)
1236 .map(|a| a.get_url().ip.clone())
1237 .unwrap_or_default()
1238 });
1239
1240 let arcs = invoker_arcs.clone();
1241 engine.register_fn(
1242 "invoker_has_param",
1243 move |index: i64, key: String| -> bool {
1244 let idx = index as usize;
1245 arcs.get(idx)
1246 .is_some_and(|a| a.get_url().get_param(&key).is_some())
1247 },
1248 );
1249
1250 let arcs = invoker_arcs;
1251 engine.register_fn(
1252 "invoker_get_param",
1253 move |index: i64, key: String| -> String {
1254 let idx = index as usize;
1255 arcs.get(idx)
1256 .and_then(|a| a.get_url().get_param(&key).cloned())
1257 .unwrap_or_default()
1258 },
1259 );
1260
1261 engine.register_fn("method_name", move || -> String { method.clone() });
1262
1263 let att = attachments.clone();
1264 engine.register_fn("has_attachment", move |key: String| -> bool {
1265 att.contains_key(&key)
1266 });
1267
1268 engine.register_fn("get_attachment", move |key: String| -> String {
1269 attachments.get(&key).cloned().unwrap_or_default()
1270 });
1271
1272 let mut scope = rhai::Scope::new();
1273 let result = engine.eval_ast_with_scope::<rhai::Dynamic>(&mut scope, &self.compiled);
1274
1275 match result {
1276 Ok(dynamic) => {
1277 if let Some(arr) = dynamic.clone().try_cast::<rhai::Array>() {
1278 arr.into_iter()
1279 .filter_map(|v| v.try_cast::<i64>().map(|i| i as usize))
1280 .filter(|&i| i < invokers_len)
1281 .collect()
1282 } else {
1283 (0..invokers_len).collect()
1284 }
1285 }
1286 Err(_) => (0..invokers_len).collect(),
1287 }
1288 }
1289
1290 #[must_use]
1291 pub fn script(&self) -> &str {
1292 &self.script
1293 }
1294}
1295
1296pub struct RouterChain {
1301 condition_routers: Vec<ConditionRouter>,
1302 tag_router: Option<TagRouter>,
1303 script_router: Option<ScriptRouter>,
1304}
1305
1306impl RouterChain {
1307 #[must_use]
1308 pub fn new() -> Self {
1309 Self {
1310 condition_routers: Vec::new(),
1311 tag_router: None,
1312 script_router: None,
1313 }
1314 }
1315
1316 pub fn add_condition_router(&mut self, router: ConditionRouter) {
1317 self.condition_routers.push(router);
1318 }
1319
1320 pub fn set_tag_router(&mut self, router: TagRouter) {
1321 self.tag_router = Some(router);
1322 }
1323
1324 #[must_use]
1325 pub fn with_condition_router(mut self, router: ConditionRouter) -> Self {
1326 self.condition_routers.push(router);
1327 self
1328 }
1329
1330 #[must_use]
1331 pub fn with_tag_router(mut self, router: TagRouter) -> Self {
1332 self.tag_router = Some(router);
1333 self
1334 }
1335
1336 #[must_use]
1337 pub fn with_script_router(mut self, router: ScriptRouter) -> Self {
1338 self.script_router = Some(router);
1339 self
1340 }
1341
1342 pub fn set_script_router(&mut self, router: ScriptRouter) {
1343 self.script_router = Some(router);
1344 }
1345
1346 #[must_use]
1350 pub fn route(&self, invokers: &[Arc<dyn Invoker>], ctx: &InvocationContext) -> Vec<usize> {
1351 let mut current: Vec<usize> = (0..invokers.len()).collect();
1352
1353 for cr in &self.condition_routers {
1354 if cr.matches_invocation(ctx) {
1355 let filtered = cr.filter_invokers(invokers);
1356 current.retain(|i| filtered.contains(i));
1357 if current.is_empty() {
1358 return current;
1359 }
1360 }
1361 }
1362
1363 if let Some(ref tr) = self.tag_router {
1364 let filtered = tr.route(invokers, ctx);
1365 current.retain(|i| filtered.contains(i));
1366 }
1367
1368 if let Some(ref sr) = self.script_router {
1369 let filtered = sr.route(invokers, ctx);
1370 current.retain(|i| filtered.contains(i));
1371 }
1372
1373 current
1374 }
1375}
1376
1377impl Default for RouterChain {
1378 fn default() -> Self {
1379 Self::new()
1380 }
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385 use super::*;
1386 use std::sync::Arc;
1387
1388 fn make_url(host: &str, port: &str, path: &str) -> URL {
1389 let mut url = URL::new("tri", path);
1390 url.ip = host.to_string();
1391 url.port = port.to_string();
1392 url
1393 }
1394
1395 #[test]
1396 fn test_static_directory_empty_returns_error() {
1397 let dir = StaticDirectory::new(URL::new("tri", "/com.example.Service"));
1398 let _ctx = InvocationContext::new("sayHello", URL::new("tri", "/com.example.Service"));
1399
1400 assert_eq!(dir.invoker_count(), 0);
1401 assert_eq!(dir.get_url().path, "/com.example.Service");
1402 }
1403
1404 #[test]
1405 fn test_registry_directory_new_is_empty() {
1406 let dir = RegistryDirectory::new(URL::new("tri", "/com.example.Service"));
1407 assert_eq!(dir.invoker_count(), 0);
1408 assert_eq!(dir.get_url().path, "/com.example.Service");
1409 }
1410
1411 #[test]
1412 fn test_registry_directory_refresh_invokers() {
1413 let dir = RegistryDirectory::new(URL::new("tri", "/com.example.Service"));
1414 let providers = vec![
1415 make_url("192.168.1.1", "50051", "/com.example.Service"),
1416 make_url("192.168.1.2", "50051", "/com.example.Service"),
1417 ];
1418 dir.refresh_invokers(&providers);
1419 assert_eq!(dir.invoker_count(), 2);
1420 }
1421
1422 #[test]
1423 fn test_registry_directory_notify_add() {
1424 let dir = Arc::new(RegistryDirectory::new(URL::new(
1425 "tri",
1426 "/com.example.Service",
1427 )));
1428 let providers = vec![make_url("192.168.1.1", "50051", "/com.example.Service")];
1429
1430 dir.refresh_invokers(&providers);
1431 assert_eq!(dir.invoker_count(), 1);
1432 }
1433
1434 #[test]
1435 fn test_registry_directory_notify_remove() {
1436 let dir = Arc::new(RegistryDirectory::new(URL::new(
1437 "tri",
1438 "/com.example.Service",
1439 )));
1440 let initial = vec![
1441 make_url("192.168.1.1", "50051", "/com.example.Service"),
1442 make_url("192.168.1.2", "50051", "/com.example.Service"),
1443 ];
1444 dir.refresh_invokers(&initial);
1445 assert_eq!(dir.invoker_count(), 2);
1446
1447 let after_remove = vec![make_url("192.168.1.1", "50051", "/com.example.Service")];
1448 dir.refresh_invokers(&after_remove);
1449 assert_eq!(dir.invoker_count(), 1);
1450 }
1451
1452 struct MockInvoker {
1453 url: URL,
1454 succeed: bool,
1455 call_count: std::sync::atomic::AtomicUsize,
1456 }
1457
1458 impl MockInvoker {
1459 fn new(url: URL, succeed: bool) -> Self {
1460 Self {
1461 url,
1462 succeed,
1463 call_count: std::sync::atomic::AtomicUsize::new(0),
1464 }
1465 }
1466 }
1467
1468 impl Node for MockInvoker {
1469 fn get_url(&self) -> &URL {
1470 &self.url
1471 }
1472 fn is_available(&self) -> bool {
1473 true
1474 }
1475 fn destroy(&self) {}
1476 }
1477
1478 #[async_trait]
1479 impl Invoker for MockInvoker {
1480 async fn invoke(&self, _ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
1481 self.call_count
1482 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1483 if self.succeed {
1484 Ok(RPCResult::success(b"mock_response".to_vec()))
1485 } else {
1486 Ok(RPCResult::from_error(RPCError::ServerError(
1487 "mock failure".into(),
1488 )))
1489 }
1490 }
1491 }
1492
1493 struct MockDirectory {
1494 url: URL,
1495 invokers: Vec<Arc<dyn Invoker>>,
1496 }
1497
1498 impl MockDirectory {
1499 fn new(invokers: Vec<Arc<dyn Invoker>>) -> Self {
1500 Self {
1501 url: URL::new("tri", "/mock"),
1502 invokers,
1503 }
1504 }
1505 }
1506
1507 #[async_trait]
1508 impl Directory for MockDirectory {
1509 async fn list(&self, _ctx: &InvocationContext) -> Result<Vec<Arc<dyn Invoker>>, RPCError> {
1510 if self.invokers.is_empty() {
1511 return Err(RPCError::ServiceNotFound("no invokers".into()));
1512 }
1513 Ok(self.invokers.iter().map(Arc::clone).collect())
1514 }
1515
1516 fn get_url(&self) -> &URL {
1517 &self.url
1518 }
1519 }
1520
1521 #[tokio::test]
1522 async fn test_failfast_cluster_success() {
1523 let invoker = Arc::new(MockInvoker::new(
1524 make_url("192.168.1.1", "50051", "/svc"),
1525 true,
1526 ));
1527 let directory = Box::new(MockDirectory::new(vec![invoker]));
1528 let cluster = FailfastCluster;
1529 let _cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1530 }
1531
1532 #[tokio::test]
1533 async fn test_failfast_cluster_empty_directory() {
1534 let directory = Box::new(MockDirectory::new(vec![]));
1535 let cluster = FailfastCluster;
1536 let _cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1537 }
1538
1539 #[tokio::test]
1540 async fn test_failover_cluster_creation() {
1541 let invokers: Vec<Arc<dyn Invoker>> = (0..3)
1542 .map(|i| {
1543 Arc::new(MockInvoker::new(
1544 make_url("192.168.1.1", &format!("5005{i}"), "/svc"),
1545 true,
1546 )) as Arc<dyn Invoker>
1547 })
1548 .collect();
1549 let directory = Box::new(MockDirectory::new(invokers));
1550 let cluster = FailoverCluster::new().with_retries(3);
1551 let _cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1552 }
1553
1554 #[tokio::test]
1555 async fn test_failover_cluster_default_retries() {
1556 let cluster = FailoverCluster::new();
1557 assert_eq!(cluster.retries, 2);
1558 }
1559
1560 #[tokio::test]
1561 async fn test_provider_invoker_invoke_returns_error() {
1562 let provider_url = make_url("192.168.1.1", "50051", "/com.example.Service");
1563 let invoker = ProviderInvoker {
1564 provider_url: provider_url.clone(),
1565 };
1566 let mut ctx = InvocationContext::new("sayHello", provider_url);
1567 let result = invoker.invoke(&mut ctx).await;
1568 assert!(result.is_err());
1569 let err_msg = result.unwrap_err().to_string();
1570 assert!(
1571 err_msg.contains("no protocol invoker"),
1572 "expected message about missing invoker factory, got: {err_msg}"
1573 );
1574 }
1575
1576 #[tokio::test]
1577 async fn test_static_directory_list_filters_unavailable() {
1578 use std::sync::atomic::{AtomicBool, Ordering};
1579
1580 struct AvailabilityInvoker {
1581 url: URL,
1582 available: AtomicBool,
1583 }
1584
1585 impl Node for AvailabilityInvoker {
1586 fn get_url(&self) -> &URL {
1587 &self.url
1588 }
1589 fn is_available(&self) -> bool {
1590 self.available.load(Ordering::SeqCst)
1591 }
1592 fn destroy(&self) {}
1593 }
1594
1595 #[async_trait]
1596 impl Invoker for AvailabilityInvoker {
1597 async fn invoke(
1598 &self,
1599 _ctx: &mut InvocationContext,
1600 ) -> Result<RPCResult, anyhow::Error> {
1601 Ok(RPCResult::success(b"ok".to_vec()))
1602 }
1603 }
1604
1605 let dir = StaticDirectory::new(URL::new("tri", "/com.example.Service"));
1606
1607 let inv1 = Arc::new(AvailabilityInvoker {
1608 url: make_url("192.168.1.1", "50051", "/svc"),
1609 available: AtomicBool::new(true),
1610 });
1611 let inv2 = Arc::new(AvailabilityInvoker {
1612 url: make_url("192.168.1.2", "50051", "/svc"),
1613 available: AtomicBool::new(false),
1614 });
1615 let inv3 = Arc::new(AvailabilityInvoker {
1616 url: make_url("192.168.1.3", "50051", "/svc"),
1617 available: AtomicBool::new(true),
1618 });
1619
1620 dir.add_invoker(inv1);
1621 dir.add_invoker(inv2);
1622 dir.add_invoker(inv3);
1623
1624 assert_eq!(dir.invoker_count(), 3);
1625
1626 let ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1627 let available = dir.list(&ctx).await.expect("list should succeed");
1628 assert_eq!(
1629 available.len(),
1630 2,
1631 "only available invokers should be returned"
1632 );
1633 }
1634
1635 #[test]
1636 fn test_registry_directory_with_invoker_factory() {
1637 let dir = RegistryDirectory::new(URL::new("tri", "/com.example.Service"))
1638 .with_invoker_factory(|url| {
1639 let invoker: Box<dyn Invoker> = Box::new(MockInvoker::new(url.clone(), true));
1640 Ok(invoker)
1641 });
1642 let providers = vec![
1643 make_url("192.168.1.1", "50051", "/com.example.Service"),
1644 make_url("192.168.1.2", "50051", "/com.example.Service"),
1645 make_url("192.168.1.3", "50051", "/com.example.Service"),
1646 ];
1647 dir.refresh_invokers(&providers);
1648 assert_eq!(dir.invoker_count(), 3);
1649 }
1650
1651 #[test]
1652 fn test_failover_cluster_retries_count() {
1653 let cluster = FailoverCluster::new().with_retries(5);
1654 assert_eq!(cluster.retries, 5);
1655 }
1656
1657 #[tokio::test]
1658 async fn test_failfast_cluster_join_with_empty_directory() {
1659 let directory = Box::new(MockDirectory::new(vec![]));
1660 let cluster = FailfastCluster;
1661 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1662
1663 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1664 let result = cluster_invoker.invoke(&mut ctx).await;
1665 assert!(result.is_err(), "invoke on empty directory should fail");
1666 }
1667
1668 #[tokio::test]
1669 async fn test_failover_cluster_invoker_retries_on_failure() {
1670 let inv1 = Arc::new(MockInvoker::new(
1671 make_url("192.168.1.1", "50051", "/svc"),
1672 false,
1673 ));
1674 let inv2 = Arc::new(MockInvoker::new(
1675 make_url("192.168.1.2", "50051", "/svc"),
1676 false,
1677 ));
1678 let inv3 = Arc::new(MockInvoker::new(
1679 make_url("192.168.1.3", "50051", "/svc"),
1680 false,
1681 ));
1682
1683 let call1 = Arc::clone(&inv1);
1684 let call2 = Arc::clone(&inv2);
1685 let call3 = Arc::clone(&inv3);
1686
1687 let directory = Box::new(MockDirectory::new(vec![inv1, inv2, inv3]));
1688 let cluster = FailoverCluster::new().with_retries(0);
1689 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1690
1691 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1692 let result = cluster_invoker.invoke(&mut ctx).await;
1693 assert!(result.is_err(), "all invokers fail, should return error");
1694
1695 assert_eq!(
1697 call1.call_count.load(std::sync::atomic::Ordering::SeqCst),
1698 1,
1699 "invoker 1 should be called once"
1700 );
1701 assert_eq!(
1702 call2.call_count.load(std::sync::atomic::Ordering::SeqCst),
1703 1,
1704 "invoker 2 should be called once"
1705 );
1706 assert_eq!(
1707 call3.call_count.load(std::sync::atomic::Ordering::SeqCst),
1708 1,
1709 "invoker 3 should be called once"
1710 );
1711 }
1712
1713 #[tokio::test]
1718 async fn test_failsafe_cluster_all_success() {
1719 let invoker = Arc::new(MockInvoker::new(
1720 make_url("192.168.1.1", "50051", "/svc"),
1721 true,
1722 ));
1723 let directory = Box::new(MockDirectory::new(vec![invoker]));
1724 let cluster = FailsafeCluster;
1725 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1726
1727 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1728 let result = cluster_invoker.invoke(&mut ctx).await;
1729 assert!(result.is_ok(), "failsafe should always return Ok");
1730 let rpc = result.unwrap();
1731 assert!(
1732 !rpc.is_error(),
1733 "successful invocation should not have error"
1734 );
1735 assert_eq!(rpc.value, Some(b"mock_response".to_vec()));
1736 }
1737
1738 #[tokio::test]
1739 async fn test_failsafe_cluster_all_failing() {
1740 let invoker = Arc::new(MockInvoker::new(
1741 make_url("192.168.1.1", "50051", "/svc"),
1742 false,
1743 ));
1744 let directory = Box::new(MockDirectory::new(vec![invoker]));
1745 let cluster = FailsafeCluster;
1746 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1747
1748 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1749 let result = cluster_invoker.invoke(&mut ctx).await;
1750 assert!(result.is_ok(), "failsafe should swallow errors");
1751 let rpc = result.unwrap();
1752 assert!(!rpc.is_error(), "failsafe returns empty success on failure");
1753 assert_eq!(rpc.value, Some(vec![]));
1754 }
1755
1756 #[tokio::test]
1757 async fn test_failsafe_cluster_empty_directory() {
1758 let directory = Box::new(MockDirectory::new(vec![]));
1759 let cluster = FailsafeCluster;
1760 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1761
1762 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1763 let result = cluster_invoker.invoke(&mut ctx).await;
1764 assert!(result.is_ok(), "failsafe should handle empty directory");
1765 let rpc = result.unwrap();
1766 assert!(!rpc.is_error());
1767 assert_eq!(rpc.value, Some(vec![]));
1768 }
1769
1770 #[tokio::test]
1771 async fn test_failsafe_cluster_default() {
1772 let cluster = FailsafeCluster;
1773 let _ = cluster;
1774 }
1775
1776 #[tokio::test]
1781 async fn test_failback_cluster_all_success() {
1782 let invoker = Arc::new(MockInvoker::new(
1783 make_url("192.168.1.1", "50051", "/svc"),
1784 true,
1785 ));
1786 let directory = Box::new(MockDirectory::new(vec![invoker]));
1787 let cluster = FailbackCluster::new();
1788 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1789
1790 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1791 let result = cluster_invoker.invoke(&mut ctx).await;
1792 assert!(result.is_ok());
1793 let rpc = result.unwrap();
1794 assert!(!rpc.is_error());
1795 assert_eq!(rpc.value, Some(b"mock_response".to_vec()));
1796 }
1797
1798 #[tokio::test]
1799 async fn test_failback_cluster_failure_returns_empty_success() {
1800 let invoker = Arc::new(MockInvoker::new(
1801 make_url("192.168.1.1", "50051", "/svc"),
1802 false,
1803 ));
1804 let directory = Box::new(MockDirectory::new(vec![invoker]));
1805 let cluster = FailbackCluster::new();
1806 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1807
1808 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1809 let result = cluster_invoker.invoke(&mut ctx).await;
1810 assert!(result.is_ok(), "failback should return Ok on failure");
1811 let rpc = result.unwrap();
1812 assert!(!rpc.is_error(), "failback returns empty success");
1813 assert_eq!(rpc.value, Some(vec![]));
1814 }
1815
1816 #[tokio::test]
1817 async fn test_failback_cluster_empty_directory() {
1818 let directory = Box::new(MockDirectory::new(vec![]));
1819 let cluster = FailbackCluster::new();
1820 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1821
1822 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1823 let result = cluster_invoker.invoke(&mut ctx).await;
1824 assert!(result.is_ok());
1825 let rpc = result.unwrap();
1826 assert!(!rpc.is_error());
1827 assert_eq!(rpc.value, Some(vec![]));
1828 }
1829
1830 #[test]
1831 fn test_failback_cluster_default_config() {
1832 let cluster = FailbackCluster::new();
1833 assert_eq!(cluster.max_retries, 3);
1834 assert_eq!(cluster.retry_delay, std::time::Duration::from_secs(5));
1835 }
1836
1837 #[test]
1838 fn test_failback_cluster_custom_config() {
1839 let cluster = FailbackCluster::new()
1840 .with_retry_delay(std::time::Duration::from_secs(10))
1841 .with_max_retries(5);
1842 assert_eq!(cluster.retry_delay, std::time::Duration::from_secs(10));
1843 assert_eq!(cluster.max_retries, 5);
1844 }
1845
1846 #[tokio::test]
1851 async fn test_forking_cluster_all_success() {
1852 let inv1 = Arc::new(MockInvoker::new(
1853 make_url("192.168.1.1", "50051", "/svc"),
1854 true,
1855 ));
1856 let inv2 = Arc::new(MockInvoker::new(
1857 make_url("192.168.1.2", "50051", "/svc"),
1858 true,
1859 ));
1860 let directory = Box::new(MockDirectory::new(vec![inv1, inv2]));
1861 let cluster = ForkingCluster::new();
1862 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1863
1864 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1865 let result = cluster_invoker.invoke(&mut ctx).await;
1866 assert!(result.is_ok());
1867 let rpc = result.unwrap();
1868 assert!(!rpc.is_error());
1869 assert_eq!(rpc.value, Some(b"mock_response".to_vec()));
1870 }
1871
1872 #[tokio::test]
1873 async fn test_forking_cluster_all_fail() {
1874 let inv1 = Arc::new(MockInvoker::new(
1875 make_url("192.168.1.1", "50051", "/svc"),
1876 false,
1877 ));
1878 let inv2 = Arc::new(MockInvoker::new(
1879 make_url("192.168.1.2", "50051", "/svc"),
1880 false,
1881 ));
1882 let directory = Box::new(MockDirectory::new(vec![inv1, inv2]));
1883 let cluster = ForkingCluster::new();
1884 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1885
1886 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1887 let result = cluster_invoker.invoke(&mut ctx).await;
1888 assert!(result.is_err(), "all forks failed should return error");
1889 }
1890
1891 #[tokio::test]
1892 async fn test_forking_cluster_mixed() {
1893 let inv1 = Arc::new(MockInvoker::new(
1894 make_url("192.168.1.1", "50051", "/svc"),
1895 false,
1896 ));
1897 let inv2 = Arc::new(MockInvoker::new(
1898 make_url("192.168.1.2", "50051", "/svc"),
1899 true,
1900 ));
1901 let directory = Box::new(MockDirectory::new(vec![inv1, inv2]));
1902 let cluster = ForkingCluster::new();
1903 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1904
1905 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1906 let result = cluster_invoker.invoke(&mut ctx).await;
1907 assert!(
1908 result.is_ok(),
1909 "should succeed with at least one successful fork"
1910 );
1911 let rpc = result.unwrap();
1912 assert!(!rpc.is_error());
1913 }
1914
1915 #[tokio::test]
1916 async fn test_forking_cluster_empty_directory() {
1917 let directory = Box::new(MockDirectory::new(vec![]));
1918 let cluster = ForkingCluster::new();
1919 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1920
1921 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1922 let result = cluster_invoker.invoke(&mut ctx).await;
1923 assert!(result.is_err(), "empty directory should return error");
1924 }
1925
1926 #[test]
1927 fn test_forking_cluster_default_forks() {
1928 let cluster = ForkingCluster::new();
1929 assert_eq!(cluster.forks, 2);
1930 }
1931
1932 #[test]
1933 fn test_forking_cluster_custom_forks() {
1934 let cluster = ForkingCluster::new().with_forks(5);
1935 assert_eq!(cluster.forks, 5);
1936 }
1937
1938 #[test]
1939 fn test_forking_cluster_zero_forks_clamped() {
1940 let cluster = ForkingCluster::new().with_forks(0);
1941 assert_eq!(cluster.forks, 1, "zero forks should be clamped to 1");
1942 }
1943
1944 #[tokio::test]
1949 async fn test_broadcast_cluster_all_success() {
1950 let inv1 = Arc::new(MockInvoker::new(
1951 make_url("192.168.1.1", "50051", "/svc"),
1952 true,
1953 ));
1954 let inv2 = Arc::new(MockInvoker::new(
1955 make_url("192.168.1.2", "50051", "/svc"),
1956 true,
1957 ));
1958 let inv3 = Arc::new(MockInvoker::new(
1959 make_url("192.168.1.3", "50051", "/svc"),
1960 true,
1961 ));
1962 let directory = Box::new(MockDirectory::new(vec![inv1, inv2, inv3]));
1963 let cluster = BroadcastCluster;
1964 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1965
1966 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1967 let result = cluster_invoker.invoke(&mut ctx).await;
1968 assert!(result.is_ok());
1969 let rpc = result.unwrap();
1970 assert!(!rpc.is_error());
1971 }
1972
1973 #[tokio::test]
1974 async fn test_broadcast_cluster_all_fail() {
1975 let inv1 = Arc::new(MockInvoker::new(
1976 make_url("192.168.1.1", "50051", "/svc"),
1977 false,
1978 ));
1979 let inv2 = Arc::new(MockInvoker::new(
1980 make_url("192.168.1.2", "50051", "/svc"),
1981 false,
1982 ));
1983 let directory = Box::new(MockDirectory::new(vec![inv1, inv2]));
1984 let cluster = BroadcastCluster;
1985 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1986
1987 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1988 let result = cluster_invoker.invoke(&mut ctx).await;
1989 assert!(result.is_err(), "all invokers failing should return error");
1990 }
1991
1992 #[tokio::test]
1993 async fn test_broadcast_cluster_mixed() {
1994 let inv1 = Arc::new(MockInvoker::new(
1995 make_url("192.168.1.1", "50051", "/svc"),
1996 true,
1997 ));
1998 let inv2 = Arc::new(MockInvoker::new(
1999 make_url("192.168.1.2", "50051", "/svc"),
2000 false,
2001 ));
2002 let inv3 = Arc::new(MockInvoker::new(
2003 make_url("192.168.1.3", "50051", "/svc"),
2004 true,
2005 ));
2006 let directory = Box::new(MockDirectory::new(vec![inv1, inv2, inv3]));
2007 let cluster = BroadcastCluster;
2008 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2009
2010 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2011 let result = cluster_invoker.invoke(&mut ctx).await;
2012 assert!(
2013 result.is_err(),
2014 "any failure in broadcast should return error"
2015 );
2016 }
2017
2018 #[tokio::test]
2019 async fn test_broadcast_cluster_empty_directory() {
2020 let directory = Box::new(MockDirectory::new(vec![]));
2021 let cluster = BroadcastCluster;
2022 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2023
2024 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2025 let result = cluster_invoker.invoke(&mut ctx).await;
2026 assert!(result.is_err(), "empty directory should return error");
2027 }
2028
2029 #[tokio::test]
2030 async fn test_broadcast_cluster_default() {
2031 let cluster = BroadcastCluster;
2032 let _ = cluster;
2033 }
2034
2035 struct AvailableMockInvoker {
2040 url: URL,
2041 available: bool,
2042 succeed: bool,
2043 }
2044
2045 impl AvailableMockInvoker {
2046 fn new(url: URL, available: bool, succeed: bool) -> Self {
2047 Self {
2048 url,
2049 available,
2050 succeed,
2051 }
2052 }
2053 }
2054
2055 impl Node for AvailableMockInvoker {
2056 fn get_url(&self) -> &URL {
2057 &self.url
2058 }
2059 fn is_available(&self) -> bool {
2060 self.available
2061 }
2062 fn destroy(&self) {}
2063 }
2064
2065 #[async_trait]
2066 impl Invoker for AvailableMockInvoker {
2067 async fn invoke(&self, _ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
2068 if self.succeed {
2069 Ok(RPCResult::success(self.url.ip.as_bytes().to_vec()))
2070 } else {
2071 Ok(RPCResult::from_error(RPCError::ServerError(
2072 "mock failure".into(),
2073 )))
2074 }
2075 }
2076 }
2077
2078 #[tokio::test]
2079 async fn test_available_cluster_first_available() {
2080 let inv1 = Arc::new(AvailableMockInvoker::new(
2081 make_url("192.168.1.1", "50051", "/svc"),
2082 false,
2083 true,
2084 ));
2085 let inv2 = Arc::new(AvailableMockInvoker::new(
2086 make_url("192.168.1.2", "50051", "/svc"),
2087 false,
2088 true,
2089 ));
2090 let inv3 = Arc::new(AvailableMockInvoker::new(
2091 make_url("192.168.1.3", "50051", "/svc"),
2092 true,
2093 true,
2094 ));
2095
2096 let directory = Box::new(MockDirectory::new(vec![inv1, inv2, inv3]));
2097 let cluster = AvailableCluster;
2098 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2099
2100 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2101 let result = cluster_invoker.invoke(&mut ctx).await;
2102 assert!(result.is_ok());
2103 let rpc = result.unwrap();
2104 assert!(!rpc.is_error());
2105 assert_eq!(rpc.value, Some(b"192.168.1.3".to_vec()));
2106 }
2107
2108 #[tokio::test]
2109 async fn test_available_cluster_all_available() {
2110 let inv1 = Arc::new(AvailableMockInvoker::new(
2111 make_url("192.168.1.1", "50051", "/svc"),
2112 true,
2113 true,
2114 ));
2115 let inv2 = Arc::new(AvailableMockInvoker::new(
2116 make_url("192.168.1.2", "50051", "/svc"),
2117 true,
2118 true,
2119 ));
2120 let inv3 = Arc::new(AvailableMockInvoker::new(
2121 make_url("192.168.1.3", "50051", "/svc"),
2122 true,
2123 true,
2124 ));
2125
2126 let directory = Box::new(MockDirectory::new(vec![inv1, inv2, inv3]));
2127 let cluster = AvailableCluster;
2128 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2129
2130 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2131 let result = cluster_invoker.invoke(&mut ctx).await;
2132 assert!(result.is_ok());
2133 let rpc = result.unwrap();
2134 assert!(!rpc.is_error());
2135 assert_eq!(
2136 rpc.value,
2137 Some(b"192.168.1.1".to_vec()),
2138 "should invoke on the first available invoker"
2139 );
2140 }
2141
2142 #[tokio::test]
2143 async fn test_available_cluster_none_available() {
2144 let inv1 = Arc::new(AvailableMockInvoker::new(
2145 make_url("192.168.1.1", "50051", "/svc"),
2146 false,
2147 true,
2148 ));
2149 let inv2 = Arc::new(AvailableMockInvoker::new(
2150 make_url("192.168.1.2", "50051", "/svc"),
2151 false,
2152 true,
2153 ));
2154
2155 let directory = Box::new(MockDirectory::new(vec![inv1, inv2]));
2156 let cluster = AvailableCluster;
2157 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2158
2159 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2160 let result = cluster_invoker.invoke(&mut ctx).await;
2161 assert!(result.is_err(), "no available invoker should return error");
2162 }
2163
2164 #[tokio::test]
2165 async fn test_available_cluster_empty_directory() {
2166 let directory = Box::new(MockDirectory::new(vec![]));
2167 let cluster = AvailableCluster;
2168 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2169
2170 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2171 let result = cluster_invoker.invoke(&mut ctx).await;
2172 assert!(result.is_err(), "empty directory should return error");
2173 }
2174
2175 #[tokio::test]
2176 async fn test_available_cluster_default_trait() {
2177 let cluster = AvailableCluster;
2178 let directory = Box::new(MockDirectory::new(vec![]));
2179 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2180
2181 assert!(cluster_invoker.is_available());
2182 assert_eq!(cluster_invoker.get_url().path, "/mock");
2183 }
2184
2185 #[tokio::test]
2190 async fn test_mock_cluster_force_returns_mock_result() {
2191 let invoker = Arc::new(MockInvoker::new(
2192 make_url("192.168.1.1", "50051", "/svc"),
2193 true,
2194 ));
2195 let tracked = Arc::clone(&invoker);
2196 let directory = Box::new(MockDirectory::new(vec![invoker]));
2197 let cluster = MockCluster::new()
2198 .with_force(true)
2199 .with_mock_result(b"mock_data".to_vec());
2200 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2201
2202 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2203 let result = cluster_invoker.invoke(&mut ctx).await;
2204 assert!(result.is_ok());
2205 let rpc = result.unwrap();
2206 assert!(!rpc.is_error());
2207 assert_eq!(rpc.value, Some(b"mock_data".to_vec()));
2208 assert_eq!(
2209 tracked.call_count.load(std::sync::atomic::Ordering::SeqCst),
2210 0,
2211 "force mode should not call downstream"
2212 );
2213 }
2214
2215 #[tokio::test]
2216 async fn test_mock_cluster_force_default_empty_result() {
2217 let invoker = Arc::new(MockInvoker::new(
2218 make_url("192.168.1.1", "50051", "/svc"),
2219 true,
2220 ));
2221 let directory = Box::new(MockDirectory::new(vec![invoker]));
2222 let cluster = MockCluster::new().with_force(true);
2223 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2224
2225 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2226 let result = cluster_invoker.invoke(&mut ctx).await;
2227 assert!(result.is_ok());
2228 let rpc = result.unwrap();
2229 assert!(!rpc.is_error());
2230 assert_eq!(rpc.value, Some(vec![]));
2231 }
2232
2233 #[tokio::test]
2234 async fn test_mock_cluster_fail_on_success() {
2235 let invoker = Arc::new(MockInvoker::new(
2236 make_url("192.168.1.1", "50051", "/svc"),
2237 true,
2238 ));
2239 let directory = Box::new(MockDirectory::new(vec![invoker]));
2240 let cluster = MockCluster::new().with_mock_result(b"fallback".to_vec());
2241 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2242
2243 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2244 let result = cluster_invoker.invoke(&mut ctx).await;
2245 assert!(result.is_ok());
2246 let rpc = result.unwrap();
2247 assert!(!rpc.is_error());
2248 assert_eq!(
2249 rpc.value,
2250 Some(b"mock_response".to_vec()),
2251 "fail mode should return real result on success"
2252 );
2253 }
2254
2255 #[tokio::test]
2256 async fn test_mock_cluster_fail_on_failure() {
2257 let invoker = Arc::new(MockInvoker::new(
2258 make_url("192.168.1.1", "50051", "/svc"),
2259 false,
2260 ));
2261 let directory = Box::new(MockDirectory::new(vec![invoker]));
2262 let cluster = MockCluster::new().with_mock_result(b"fallback_data".to_vec());
2263 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2264
2265 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2266 let result = cluster_invoker.invoke(&mut ctx).await;
2267 assert!(result.is_ok());
2268 let rpc = result.unwrap();
2269 assert!(!rpc.is_error());
2270 assert_eq!(
2271 rpc.value,
2272 Some(b"fallback_data".to_vec()),
2273 "fail mode should return mock on failure"
2274 );
2275 }
2276
2277 #[tokio::test]
2278 async fn test_mock_cluster_fail_no_mock_configured() {
2279 let invoker = Arc::new(MockInvoker::new(
2280 make_url("192.168.1.1", "50051", "/svc"),
2281 false,
2282 ));
2283 let directory = Box::new(MockDirectory::new(vec![invoker]));
2284 let cluster = MockCluster::new();
2285 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2286
2287 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2288 let result = cluster_invoker.invoke(&mut ctx).await;
2289 assert!(
2290 result.is_ok(),
2291 "should return Ok with RPCResult error when no mock configured"
2292 );
2293 let rpc = result.unwrap();
2294 assert!(
2295 rpc.is_error(),
2296 "should propagate error result without mock fallback"
2297 );
2298 }
2299
2300 #[tokio::test]
2301 async fn test_mock_cluster_force_via_attachment() {
2302 let invoker = Arc::new(MockInvoker::new(
2303 make_url("192.168.1.1", "50051", "/svc"),
2304 true,
2305 ));
2306 let tracked = Arc::clone(&invoker);
2307 let directory = Box::new(MockDirectory::new(vec![invoker]));
2308 let cluster = MockCluster::new().with_mock_result(b"att_mock".to_vec());
2309 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2310
2311 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2312 ctx.attachments.insert("mock".into(), "force".into());
2313 let result = cluster_invoker.invoke(&mut ctx).await;
2314 assert!(result.is_ok());
2315 let rpc = result.unwrap();
2316 assert!(!rpc.is_error());
2317 assert_eq!(rpc.value, Some(b"att_mock".to_vec()));
2318 assert_eq!(
2319 tracked.call_count.load(std::sync::atomic::Ordering::SeqCst),
2320 0,
2321 "attachment force should not call downstream"
2322 );
2323 }
2324
2325 #[tokio::test]
2326 async fn test_mock_cluster_fail_via_attachment() {
2327 let invoker = Arc::new(MockInvoker::new(
2328 make_url("192.168.1.1", "50051", "/svc"),
2329 false,
2330 ));
2331 let directory = Box::new(MockDirectory::new(vec![invoker]));
2332 let cluster = MockCluster::new().with_mock_result(b"att_fallback".to_vec());
2333 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2334
2335 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2336 ctx.attachments.insert("mock".into(), "fail".into());
2337 let result = cluster_invoker.invoke(&mut ctx).await;
2338 assert!(result.is_ok());
2339 let rpc = result.unwrap();
2340 assert!(!rpc.is_error());
2341 assert_eq!(
2342 rpc.value,
2343 Some(b"att_fallback".to_vec()),
2344 "attachment fail should return mock on failure"
2345 );
2346 }
2347
2348 #[tokio::test]
2349 async fn test_mock_cluster_mock_result_via_attachment() {
2350 let invoker = Arc::new(MockInvoker::new(
2351 make_url("192.168.1.1", "50051", "/svc"),
2352 true,
2353 ));
2354 let tracked = Arc::clone(&invoker);
2355 let directory = Box::new(MockDirectory::new(vec![invoker]));
2356 let cluster = MockCluster::new().with_force(true);
2357 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2358
2359 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2360 ctx.attachments
2361 .insert("mock.result".into(), "from_attachment".into());
2362 let result = cluster_invoker.invoke(&mut ctx).await;
2363 assert!(result.is_ok());
2364 let rpc = result.unwrap();
2365 assert!(!rpc.is_error());
2366 assert_eq!(
2367 rpc.value,
2368 Some(b"from_attachment".to_vec()),
2369 "mock.result attachment should override mock data"
2370 );
2371 assert_eq!(
2372 tracked.call_count.load(std::sync::atomic::Ordering::SeqCst),
2373 0
2374 );
2375 }
2376
2377 #[tokio::test]
2378 async fn test_mock_cluster_empty_directory() {
2379 let directory = Box::new(MockDirectory::new(vec![]));
2380 let cluster = MockCluster::new()
2381 .with_force(true)
2382 .with_mock_result(b"mock".to_vec());
2383 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2384
2385 let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2386 let result = cluster_invoker.invoke(&mut ctx).await;
2387 assert!(result.is_ok());
2388 let rpc = result.unwrap();
2389 assert!(!rpc.is_error());
2390 assert_eq!(
2391 rpc.value,
2392 Some(b"mock".to_vec()),
2393 "force mode returns mock even with empty directory"
2394 );
2395 }
2396
2397 #[tokio::test]
2398 async fn test_mock_cluster_join_creates_invoker() {
2399 let invoker = Arc::new(MockInvoker::new(
2400 make_url("192.168.1.1", "50051", "/svc"),
2401 true,
2402 ));
2403 let directory = Box::new(MockDirectory::new(vec![invoker]));
2404 let cluster = MockCluster::new().with_force(true);
2405 let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2406
2407 assert!(cluster_invoker.is_available());
2408 assert_eq!(cluster_invoker.get_url().path, "/mock");
2409 }
2410
2411 #[test]
2412 fn test_mock_cluster_default() {
2413 let cluster = MockCluster::default();
2414 assert!(!cluster.force);
2415 assert!(cluster.mock_result.is_none());
2416 }
2417
2418 #[test]
2419 fn test_mock_cluster_builder() {
2420 let cluster = MockCluster::new()
2421 .with_force(true)
2422 .with_mock_result(b"test".to_vec());
2423 assert!(cluster.force);
2424 assert_eq!(cluster.mock_result, Some(b"test".to_vec()));
2425 }
2426}
2427
2428#[cfg(test)]
2430mod router_tests {
2431 use super::*;
2432
2433 fn make_url_with_params(host: &str, params: &[(&str, &str)]) -> URL {
2434 let mut url = URL::new("tri", "/com.example.Service");
2435 url.ip = host.to_string();
2436 for (k, v) in params {
2437 url.set_param(*k, *v);
2438 }
2439 url
2440 }
2441
2442 struct RouterTestInvoker {
2443 url: URL,
2444 }
2445
2446 impl RouterTestInvoker {
2447 fn new(url: URL) -> Self {
2448 Self { url }
2449 }
2450 }
2451
2452 impl Node for RouterTestInvoker {
2453 fn get_url(&self) -> &URL {
2454 &self.url
2455 }
2456 fn is_available(&self) -> bool {
2457 true
2458 }
2459 fn destroy(&self) {}
2460 }
2461
2462 #[async_trait::async_trait]
2463 impl Invoker for RouterTestInvoker {
2464 async fn invoke(&self, _ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
2465 Ok(RPCResult::success(b"ok".to_vec()))
2466 }
2467 }
2468
2469 fn make_invokers(params_list: &[&[(&str, &str)]]) -> Vec<Arc<dyn Invoker>> {
2470 params_list
2471 .iter()
2472 .enumerate()
2473 .map(|(i, params)| {
2474 let host = format!("192.168.1.{}", i + 1);
2475 Arc::new(RouterTestInvoker::new(make_url_with_params(&host, params)))
2476 as Arc<dyn Invoker>
2477 })
2478 .collect()
2479 }
2480
2481 fn make_ctx() -> InvocationContext {
2482 let mut url = URL::new("tri", "/com.example.Service");
2483 url.ip = "127.0.0.1".into();
2484 InvocationContext::new("sayHello", url)
2485 }
2486
2487 #[test]
2490 fn test_condition_router_parse_simple() {
2491 let r = ConditionRouter::parse("env=gray").unwrap();
2492 assert!(r.match_rules.is_empty());
2493 assert_eq!(r.filter_rules.len(), 1);
2494 assert_eq!(r.filter_rules[0], ("env".to_string(), "gray".to_string()));
2495 }
2496
2497 #[test]
2498 fn test_condition_router_parse_with_match() {
2499 let r = ConditionRouter::parse("region=beijing => env=gray").unwrap();
2500 assert_eq!(r.match_rules.len(), 1);
2501 assert_eq!(
2502 r.match_rules[0],
2503 ("region".to_string(), "beijing".to_string())
2504 );
2505 assert_eq!(r.filter_rules[0], ("env".to_string(), "gray".to_string()));
2506 }
2507
2508 #[test]
2509 fn test_condition_router_always_matches_empty() {
2510 let r = ConditionRouter::parse("=> env=gray").unwrap();
2511 let ctx = make_ctx();
2512 assert!(r.matches_invocation(&ctx));
2513 }
2514
2515 #[test]
2516 fn test_condition_router_matches_with_attachment() {
2517 let r = ConditionRouter::parse("region=beijing => env=gray").unwrap();
2518 let mut ctx = make_ctx();
2519 ctx.attachments.insert("region".into(), "beijing".into());
2520 assert!(r.matches_invocation(&ctx));
2521 }
2522
2523 #[test]
2524 fn test_condition_router_no_match_wrong_value() {
2525 let r = ConditionRouter::parse("region=beijing => env=gray").unwrap();
2526 let mut ctx = make_ctx();
2527 ctx.attachments.insert("region".into(), "shanghai".into());
2528 assert!(!r.matches_invocation(&ctx));
2529 }
2530
2531 #[test]
2532 fn test_condition_router_filters_by_param() {
2533 let invokers = make_invokers(&[&[("env", "gray")], &[("env", "prod")], &[("env", "gray")]]);
2534 let r = ConditionRouter::parse("=> env=gray").unwrap();
2535 let filtered = r.filter_invokers(&invokers);
2536 assert_eq!(filtered, vec![0, 2]);
2537 }
2538
2539 #[test]
2542 fn test_tag_router_all_when_no_tag_requested() {
2543 let invokers = make_invokers(&[&[("tag", "v1")], &[("tag", "v2")], &[]]);
2544 let ctx = make_ctx();
2545 let tr = TagRouter::default();
2546 let result = tr.route(&invokers, &ctx);
2547 assert_eq!(result, vec![0, 1, 2]);
2548 }
2549
2550 #[test]
2551 fn test_tag_router_matches_requested_tag() {
2552 let invokers = make_invokers(&[&[("tag", "v1")], &[("tag", "v2")], &[("tag", "v1")]]);
2553 let mut ctx = make_ctx();
2554 ctx.attachments.insert("dubbo.tag".into(), "v2".into());
2555 let tr = TagRouter::default();
2556 let result = tr.route(&invokers, &ctx);
2557 assert_eq!(result, vec![1]);
2558 }
2559
2560 #[test]
2561 fn test_tag_router_fallback_to_untagged() {
2562 let invokers = make_invokers(&[&[("tag", "v1")], &[]]);
2563 let mut ctx = make_ctx();
2564 ctx.attachments.insert("dubbo.tag".into(), "v2".into());
2565 let tr = TagRouter::default();
2566 let result = tr.route(&invokers, &ctx);
2567 assert_eq!(result, vec![1]);
2568 }
2569
2570 #[test]
2573 fn test_router_chain_empty_returns_all() {
2574 let invokers = make_invokers(&[&[], &[]]);
2575 let chain = RouterChain::default();
2576 let result = chain.route(&invokers, &make_ctx());
2577 assert_eq!(result, vec![0, 1]);
2578 }
2579
2580 #[test]
2581 fn test_router_chain_condition_then_tag() {
2582 let invokers = make_invokers(&[
2583 &[("env", "gray"), ("tag", "v1")],
2584 &[("env", "gray"), ("tag", "v2")],
2585 &[("env", "prod"), ("tag", "v1")],
2586 ]);
2587 let mut ctx = make_ctx();
2588 ctx.attachments.insert("dubbo.tag".into(), "v1".into());
2589
2590 let chain = RouterChain::new()
2591 .with_condition_router(ConditionRouter::parse("=> env=gray").unwrap())
2592 .with_tag_router(TagRouter::default());
2593
2594 let result = chain.route(&invokers, &ctx);
2595 assert_eq!(result, vec![0]);
2596 }
2597
2598 #[test]
2599 fn test_condition_router_parse_empty_filter() {
2600 let r = ConditionRouter::parse("region=beijing =>").unwrap();
2601 assert_eq!(r.match_rules.len(), 1);
2602 assert_eq!(
2603 r.match_rules[0],
2604 ("region".to_string(), "beijing".to_string())
2605 );
2606 assert!(r.filter_rules.is_empty());
2607 }
2608
2609 #[test]
2610 fn test_condition_router_parse_kv_pairs_directly() {
2611 let r = ConditionRouter::parse("a=1,b=2").unwrap();
2612 assert!(r.match_rules.is_empty());
2613 assert_eq!(r.filter_rules.len(), 2);
2614 assert_eq!(r.filter_rules[0], ("a".to_string(), "1".to_string()));
2615 assert_eq!(r.filter_rules[1], ("b".to_string(), "2".to_string()));
2616 }
2617
2618 #[test]
2619 fn test_condition_router_filter_no_match() {
2620 let invokers = make_invokers(&[
2621 &[("env", "prod")],
2622 &[("env", "staging")],
2623 &[("env", "prod")],
2624 ]);
2625 let r = ConditionRouter::parse("=> zone=us-west").unwrap();
2626 let filtered = r.filter_invokers(&invokers);
2627 assert!(
2628 filtered.is_empty(),
2629 "no invoker has 'zone' param, result should be empty"
2630 );
2631 }
2632
2633 #[test]
2634 fn test_tag_router_with_custom_key() {
2635 let invokers = make_invokers(&[&[("tag", "v1")], &[("tag", "v2")], &[]]);
2636 let mut ctx = make_ctx();
2637 ctx.attachments.insert("custom-key".into(), "v1".into());
2638
2639 let tr = TagRouter::new().with_tag_key("custom-key");
2640 let result = tr.route(&invokers, &ctx);
2641 assert_eq!(result, vec![0]);
2642 }
2643
2644 #[test]
2645 fn test_router_chain_empty_after_condition() {
2646 let invokers = make_invokers(&[&[("env", "prod")], &[("env", "prod")]]);
2647 let chain = RouterChain::new()
2648 .with_condition_router(ConditionRouter::parse("=> env=gray").unwrap());
2649 let result = chain.route(&invokers, &make_ctx());
2650 assert!(
2651 result.is_empty(),
2652 "condition router removes all invokers, chain should return empty"
2653 );
2654 }
2655}
2656
2657#[cfg(test)]
2658mod script_router_tests {
2659 use super::*;
2660
2661 fn make_url_with_params(host: &str, params: &[(&str, &str)]) -> URL {
2662 let mut url = URL::new("tri", "/com.example.Service");
2663 url.ip = host.to_string();
2664 for (k, v) in params {
2665 url.set_param(*k, *v);
2666 }
2667 url
2668 }
2669
2670 struct TestInvoker {
2671 url: URL,
2672 }
2673
2674 impl TestInvoker {
2675 fn new(url: URL) -> Self {
2676 Self { url }
2677 }
2678 }
2679
2680 impl Node for TestInvoker {
2681 fn get_url(&self) -> &URL {
2682 &self.url
2683 }
2684 fn is_available(&self) -> bool {
2685 true
2686 }
2687 fn destroy(&self) {}
2688 }
2689
2690 #[async_trait::async_trait]
2691 impl Invoker for TestInvoker {
2692 async fn invoke(&self, _ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
2693 Ok(RPCResult::success(b"ok".to_vec()))
2694 }
2695 }
2696
2697 fn make_invokers(params_list: &[&[(&str, &str)]]) -> Vec<Arc<dyn Invoker>> {
2698 params_list
2699 .iter()
2700 .enumerate()
2701 .map(|(i, params)| {
2702 let host = format!("10.0.0.{}", i + 1);
2703 Arc::new(TestInvoker::new(make_url_with_params(&host, params))) as Arc<dyn Invoker>
2704 })
2705 .collect()
2706 }
2707
2708 fn make_ctx() -> InvocationContext {
2709 let mut url = URL::new("tri", "/com.example.Service");
2710 url.ip = "127.0.0.1".into();
2711 InvocationContext::new("sayHello", url)
2712 }
2713
2714 #[test]
2715 fn test_script_router_basic_filter() {
2716 let router = ScriptRouter::new("[0, 2]").unwrap();
2717 let invokers = make_invokers(&[&[], &[], &[]]);
2718 let ctx = make_ctx();
2719 let result = router.route(&invokers, &ctx);
2720 assert_eq!(result, vec![0, 2]);
2721 }
2722
2723 #[test]
2724 fn test_script_router_select_by_ip() {
2725 let script = r#"
2726 let result = [];
2727 for i in 0..invoker_count() {
2728 if invoker_ip(i).starts_with("10.0.0.1") {
2729 result.push(i);
2730 }
2731 }
2732 result
2733 "#;
2734 let router = ScriptRouter::new(script).unwrap();
2735 let invokers = make_invokers(&[&[], &[], &[]]);
2736 let ctx = make_ctx();
2737 let result = router.route(&invokers, &ctx);
2738 assert_eq!(result, vec![0]);
2739 }
2740
2741 #[test]
2742 fn test_script_router_select_by_param() {
2743 let script = r#"
2744 let result = [];
2745 for i in 0..invoker_count() {
2746 if invoker_has_param(i, "env") && invoker_get_param(i, "env") == "gray" {
2747 result.push(i);
2748 }
2749 }
2750 result
2751 "#;
2752 let router = ScriptRouter::new(script).unwrap();
2753 let invokers = make_invokers(&[&[("env", "gray")], &[("env", "prod")], &[("env", "gray")]]);
2754 let ctx = make_ctx();
2755 let result = router.route(&invokers, &ctx);
2756 assert_eq!(result, vec![0, 2]);
2757 }
2758
2759 #[test]
2760 fn test_script_router_method_name() {
2761 let script = r#"
2762 if method_name() == "sayHello" {
2763 [0, 1]
2764 } else {
2765 []
2766 }
2767 "#;
2768 let router = ScriptRouter::new(script).unwrap();
2769 let invokers = make_invokers(&[&[], &[]]);
2770 let ctx = make_ctx();
2771 let result = router.route(&invokers, &ctx);
2772 assert_eq!(result, vec![0, 1]);
2773
2774 let mut url = URL::new("tri", "/com.example.Service");
2775 url.ip = "127.0.0.1".into();
2776 let ctx2 = InvocationContext::new("byebye", url);
2777 let result2 = router.route(&invokers, &ctx2);
2778 assert!(result2.is_empty());
2779 }
2780
2781 #[test]
2782 fn test_script_router_attachment_based() {
2783 let script = r#"
2784 if has_attachment("region") && get_attachment("region") == "beijing" {
2785 [1]
2786 } else {
2787 [0]
2788 }
2789 "#;
2790 let router = ScriptRouter::new(script).unwrap();
2791 let invokers = make_invokers(&[&[], &[]]);
2792
2793 let mut ctx = make_ctx();
2794 ctx.attachments.insert("region".into(), "beijing".into());
2795 let result = router.route(&invokers, &ctx);
2796 assert_eq!(result, vec![1]);
2797
2798 let ctx_no_att = make_ctx();
2799 let result2 = router.route(&invokers, &ctx_no_att);
2800 assert_eq!(result2, vec![0]);
2801 }
2802
2803 #[test]
2804 fn test_script_router_invalid_script() {
2805 let result = ScriptRouter::new("fn (");
2806 assert!(result.is_err(), "bad script should fail to compile");
2807 }
2808
2809 #[test]
2810 fn test_script_router_out_of_bounds() {
2811 let router = ScriptRouter::new("[99, 200]").unwrap();
2812 let invokers = make_invokers(&[&[], &[]]);
2813 let ctx = make_ctx();
2814 let result = router.route(&invokers, &ctx);
2815 assert!(
2816 result.is_empty(),
2817 "out-of-bounds indices should produce empty result"
2818 );
2819 }
2820
2821 #[test]
2822 fn test_script_router_chain_integration() {
2823 let script = r#"
2824 let result = [];
2825 for i in 0..invoker_count() {
2826 if invoker_has_param(i, "env") && invoker_get_param(i, "env") == "gray" {
2827 result.push(i);
2828 }
2829 }
2830 result
2831 "#;
2832 let sr = ScriptRouter::new(script).unwrap();
2833 let invokers = make_invokers(&[
2834 &[("env", "gray"), ("tag", "v1")],
2835 &[("env", "prod")],
2836 &[("env", "gray"), ("tag", "v2")],
2837 ]);
2838 let mut ctx = make_ctx();
2839 ctx.attachments.insert("dubbo.tag".into(), "v1".into());
2840
2841 let chain = RouterChain::new()
2842 .with_tag_router(TagRouter::default())
2843 .with_script_router(sr);
2844 let result = chain.route(&invokers, &ctx);
2845 assert_eq!(result, vec![0]);
2846 }
2847}