Skip to main content

dubbo_rs_cluster/
lib.rs

1#![allow(
2    clippy::doc_markdown,
3    clippy::missing_panics_doc,
4    clippy::assigning_clones
5)]
6
7pub use dubbo_rs_common;
8pub use dubbo_rs_protocol;
9pub use dubbo_rs_registry;
10
11use std::sync::{Arc, RwLock};
12
13use async_trait::async_trait;
14use dubbo_rs_common::error::RPCError;
15use dubbo_rs_common::node::Node;
16use dubbo_rs_common::url::URL;
17use dubbo_rs_protocol::{InvocationContext, Invoker, RPCResult};
18use dubbo_rs_registry::{NotifyListener, ServiceEvent};
19
20/// Directory provides a list of service invokers.
21///
22/// Directories may be static (pre-configured invoker list) or dynamic
23/// (backed by a registry that updates the list as providers change).
24#[async_trait]
25pub trait Directory: Send + Sync {
26    /// Return all invokers available for the given invocation context.
27    ///
28    /// Called by the cluster strategy before each call to select an invoker.
29    ///
30    /// # Errors
31    ///
32    /// Returns `RPCError::ServiceNotFound` if no invokers are available.
33    async fn list(&self, ctx: &InvocationContext) -> Result<Vec<Arc<dyn Invoker>>, RPCError>;
34
35    /// Returns the URL identifying this directory.
36    fn get_url(&self) -> &URL;
37}
38
39/// Cluster strategy — joins a directory into a single fault-tolerant invoker.
40///
41/// A cluster wraps a directory of invokers with a retry/failover policy.
42/// The resulting invoker presents itself as a single endpoint to callers,
43/// while internally selecting from the directory and retrying on failure
44/// according to the cluster policy (e.g., failover, failfast).
45#[async_trait]
46pub trait Cluster: Send + Sync {
47    /// Join a directory into a single invoker decorated with cluster logic.
48    ///
49    /// # Errors
50    ///
51    /// Returns `RPCError` if the join fails (e.g., directory is empty).
52    async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError>;
53}
54
55/// `StaticDirectory` holds a fixed list of invokers.
56///
57/// Used for direct-connect mode where provider addresses are known ahead
58/// of time and do not change at runtime.
59pub struct StaticDirectory {
60    url: URL,
61    invokers: RwLock<Vec<Arc<dyn Invoker>>>,
62}
63
64impl StaticDirectory {
65    #[must_use]
66    pub fn new(url: URL) -> Self {
67        Self {
68            url,
69            invokers: RwLock::new(Vec::new()),
70        }
71    }
72
73    pub fn add_invoker(&self, invoker: Arc<dyn Invoker>) {
74        self.invokers.write().unwrap().push(invoker);
75    }
76
77    #[must_use]
78    pub fn invoker_count(&self) -> usize {
79        self.invokers.read().unwrap().len()
80    }
81}
82
83#[async_trait]
84impl Directory for StaticDirectory {
85    async fn list(&self, _ctx: &InvocationContext) -> Result<Vec<Arc<dyn Invoker>>, RPCError> {
86        let invokers = self.invokers.read().unwrap();
87        if invokers.is_empty() {
88            return Err(RPCError::ServiceNotFound(format!(
89                "no invokers available for {}",
90                self.url.path
91            )));
92        }
93
94        Ok(invokers
95            .iter()
96            .filter(|i| i.is_available())
97            .map(Arc::clone)
98            .collect())
99    }
100
101    fn get_url(&self) -> &URL {
102        &self.url
103    }
104}
105
106type InvokerFactory = Box<dyn Fn(&URL) -> Result<Box<dyn Invoker>, RPCError> + Send + Sync>;
107
108pub struct RegistryDirectory {
109    service_url: URL,
110    invokers: RwLock<Vec<Arc<dyn Invoker>>>,
111    provider_urls: RwLock<Vec<URL>>,
112    invoker_factory: Option<InvokerFactory>,
113}
114
115impl RegistryDirectory {
116    #[must_use]
117    pub fn new(service_url: URL) -> Self {
118        Self {
119            service_url,
120            invokers: RwLock::new(Vec::new()),
121            provider_urls: RwLock::new(Vec::new()),
122            invoker_factory: None,
123        }
124    }
125
126    #[must_use]
127    pub fn with_invoker_factory<F>(mut self, factory: F) -> Self
128    where
129        F: Fn(&URL) -> Result<Box<dyn Invoker>, RPCError> + Send + Sync + 'static,
130    {
131        self.invoker_factory = Some(Box::new(factory));
132        self
133    }
134
135    /// Update the invoker list from registry events.
136    pub fn refresh_invokers(&self, provider_urls: &[URL]) {
137        let mut invokers: Vec<Arc<dyn Invoker>> = Vec::new();
138        for url in provider_urls {
139            if let Some(ref factory) = self.invoker_factory {
140                match factory(url) {
141                    Ok(inv) => invokers.push(Arc::from(inv)),
142                    Err(e) => {
143                        tracing::warn!("failed to create invoker for {}: {e}", url.get_address());
144                    }
145                }
146            } else {
147                invokers.push(Arc::new(ProviderInvoker {
148                    provider_url: url.clone(),
149                }));
150            }
151        }
152        let mut guard = self.invokers.write().unwrap();
153        *guard = invokers;
154
155        let mut urls = self.provider_urls.write().unwrap();
156        *urls = provider_urls.to_vec();
157    }
158
159    #[must_use]
160    pub fn invoker_count(&self) -> usize {
161        self.invokers.read().unwrap().len()
162    }
163}
164
165#[async_trait]
166impl Directory for RegistryDirectory {
167    async fn list(&self, _ctx: &InvocationContext) -> Result<Vec<Arc<dyn Invoker>>, RPCError> {
168        let invokers = self.invokers.read().unwrap();
169        if invokers.is_empty() {
170            return Err(RPCError::ServiceNotFound(format!(
171                "no providers registered for {}",
172                self.service_url.path
173            )));
174        }
175
176        Ok(invokers
177            .iter()
178            .filter(|i| i.is_available())
179            .map(Arc::clone)
180            .collect())
181    }
182
183    fn get_url(&self) -> &URL {
184        &self.service_url
185    }
186}
187
188#[async_trait]
189impl NotifyListener for RegistryDirectory {
190    async fn notify(&self, event: ServiceEvent) {
191        match event {
192            ServiceEvent::Add(urls) | ServiceEvent::Update(urls) => {
193                let mut all = self.provider_urls.read().unwrap().clone();
194                for url in &urls {
195                    if !all.iter().any(|u| u.get_address() == url.get_address()) {
196                        all.push(url.clone());
197                    }
198                }
199                self.refresh_invokers(&all);
200            }
201            ServiceEvent::Remove(urls) => {
202                let all: Vec<URL> = self
203                    .provider_urls
204                    .read()
205                    .unwrap()
206                    .iter()
207                    .filter(|u| !urls.iter().any(|r| r.get_address() == u.get_address()))
208                    .cloned()
209                    .collect();
210                self.refresh_invokers(&all);
211            }
212        }
213    }
214
215    fn listen_url(&self) -> URL {
216        self.service_url.clone()
217    }
218}
219
220/// Lightweight invoker wrapping a provider URL.
221///
222/// Used when no invoker factory is configured. Returns an error
223/// indicating that a protocol-specific invoker factory is needed.
224struct ProviderInvoker {
225    provider_url: URL,
226}
227
228impl Node for ProviderInvoker {
229    fn get_url(&self) -> &URL {
230        &self.provider_url
231    }
232
233    fn is_available(&self) -> bool {
234        true
235    }
236
237    fn destroy(&self) {}
238}
239
240#[async_trait]
241impl Invoker for ProviderInvoker {
242    async fn invoke(&self, _ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
243        Err(anyhow::anyhow!(
244            "ProviderInvoker for {} has no protocol invoker — \
245             configure an invoker factory via RegistryDirectory::with_invoker_factory()",
246            self.provider_url.get_address()
247        ))
248    }
249}
250
251pub struct FailoverCluster {
252    retries: u32,
253}
254
255impl FailoverCluster {
256    #[must_use]
257    pub fn new() -> Self {
258        Self { retries: 2 }
259    }
260
261    #[must_use]
262    pub fn with_retries(mut self, retries: u32) -> Self {
263        self.retries = retries;
264        self
265    }
266}
267
268impl Default for FailoverCluster {
269    fn default() -> Self {
270        Self::new()
271    }
272}
273
274#[async_trait]
275impl Cluster for FailoverCluster {
276    async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
277        Ok(Box::new(FailoverClusterInvoker {
278            directory,
279            retries: self.retries,
280        }))
281    }
282}
283
284struct FailoverClusterInvoker {
285    directory: Box<dyn Directory>,
286    retries: u32,
287}
288
289impl Node for FailoverClusterInvoker {
290    fn get_url(&self) -> &URL {
291        self.directory.get_url()
292    }
293
294    fn is_available(&self) -> bool {
295        true
296    }
297
298    fn destroy(&self) {}
299}
300
301#[async_trait]
302impl Invoker for FailoverClusterInvoker {
303    async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
304        let mut last_error: Option<RPCError> = None;
305
306        for attempt in 0..=self.retries {
307            let invokers = self
308                .directory
309                .list(ctx)
310                .await
311                .map_err(|e| anyhow::anyhow!("{e}"))?;
312
313            if invokers.is_empty() {
314                return Err(anyhow::anyhow!("no invokers available"));
315            }
316
317            for invoker in &invokers {
318                match invoker.invoke(ctx).await {
319                    Ok(result) if !result.is_error() => return Ok(result),
320                    Ok(result) => {
321                        last_error = result.error.clone();
322                        tracing::warn!(
323                            "failover: attempt {}/{} failed with error {:?}",
324                            attempt + 1,
325                            self.retries + 1,
326                            last_error
327                        );
328                    }
329                    Err(e) => {
330                        last_error = Some(RPCError::ServerError(format!("{e}")));
331                        tracing::warn!(
332                            "failover: attempt {}/{} failed: {}",
333                            attempt + 1,
334                            self.retries + 1,
335                            e
336                        );
337                    }
338                }
339            }
340        }
341
342        Err(anyhow::anyhow!(
343            "failover: all {} attempts failed. last error: {:?}",
344            self.retries + 1,
345            last_error
346        ))
347    }
348}
349
350pub struct FailfastCluster;
351
352impl Default for FailfastCluster {
353    fn default() -> Self {
354        Self
355    }
356}
357
358#[async_trait]
359impl Cluster for FailfastCluster {
360    async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
361        Ok(Box::new(FailfastClusterInvoker { directory }))
362    }
363}
364
365struct FailfastClusterInvoker {
366    directory: Box<dyn Directory>,
367}
368
369impl Node for FailfastClusterInvoker {
370    fn get_url(&self) -> &URL {
371        self.directory.get_url()
372    }
373
374    fn is_available(&self) -> bool {
375        true
376    }
377
378    fn destroy(&self) {}
379}
380
381#[async_trait]
382impl Invoker for FailfastClusterInvoker {
383    async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
384        let invokers = self
385            .directory
386            .list(ctx)
387            .await
388            .map_err(|e| anyhow::anyhow!("{e}"))?;
389
390        if invokers.is_empty() {
391            return Err(anyhow::anyhow!("no invokers available"));
392        }
393
394        invokers[0].invoke(ctx).await
395    }
396}
397
398// ============================================================================
399// FailsafeCluster
400// ============================================================================
401
402/// `FailsafeCluster` — when an invocation fails, silently swallow the error
403/// and return an empty success result.
404///
405/// Used for non-critical operations like logging/auditing where failures
406/// should not propagate to the caller.
407pub struct FailsafeCluster;
408
409impl Default for FailsafeCluster {
410    fn default() -> Self {
411        Self
412    }
413}
414
415#[async_trait]
416impl Cluster for FailsafeCluster {
417    async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
418        Ok(Box::new(FailsafeClusterInvoker { directory }))
419    }
420}
421
422struct FailsafeClusterInvoker {
423    directory: Box<dyn Directory>,
424}
425
426impl Node for FailsafeClusterInvoker {
427    fn get_url(&self) -> &URL {
428        self.directory.get_url()
429    }
430
431    fn is_available(&self) -> bool {
432        true
433    }
434
435    fn destroy(&self) {}
436}
437
438#[async_trait]
439impl Invoker for FailsafeClusterInvoker {
440    async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
441        let invokers = match self.directory.list(ctx).await {
442            Ok(invokers) => invokers,
443            Err(e) => {
444                tracing::warn!("failsafe: failed to list invokers: {e}");
445                return Ok(RPCResult::success(vec![]));
446            }
447        };
448
449        if invokers.is_empty() {
450            tracing::warn!("failsafe: no invokers available");
451            return Ok(RPCResult::success(vec![]));
452        }
453
454        match invokers[0].invoke(ctx).await {
455            Ok(result) if !result.is_error() => Ok(result),
456            Ok(result) => {
457                tracing::warn!(
458                    "failsafe: invocation failed with error {:?}, swallowing",
459                    result.error
460                );
461                Ok(RPCResult::success(vec![]))
462            }
463            Err(e) => {
464                tracing::warn!("failsafe: invocation error: {e}, swallowing");
465                Ok(RPCResult::success(vec![]))
466            }
467        }
468    }
469}
470
471// ============================================================================
472// FailbackCluster
473// ============================================================================
474
475/// Record for a pending (failed) invocation awaiting background retry.
476#[allow(dead_code)]
477struct PendingInvocation {
478    method_name: String,
479    arguments: Vec<Vec<u8>>,
480    parameter_types: Vec<String>,
481    retry_count: u32,
482}
483
484/// `FailbackCluster` — when an invocation fails, record the failed invocation
485/// and return an empty success immediately. Schedule background retries with
486/// a configurable delay.
487///
488/// Used for operations that should eventually succeed but don't need to block
489/// the caller, such as message sending or event recording.
490pub struct FailbackCluster {
491    retry_delay: std::time::Duration,
492    max_retries: u32,
493}
494
495impl FailbackCluster {
496    #[must_use]
497    pub fn new() -> Self {
498        Self {
499            retry_delay: std::time::Duration::from_secs(5),
500            max_retries: 3,
501        }
502    }
503
504    #[must_use]
505    pub fn with_retry_delay(mut self, delay: std::time::Duration) -> Self {
506        self.retry_delay = delay;
507        self
508    }
509
510    #[must_use]
511    pub fn with_max_retries(mut self, retries: u32) -> Self {
512        self.max_retries = retries;
513        self
514    }
515}
516
517impl Default for FailbackCluster {
518    fn default() -> Self {
519        Self::new()
520    }
521}
522
523#[async_trait]
524impl Cluster for FailbackCluster {
525    async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
526        Ok(Box::new(FailbackClusterInvoker {
527            directory,
528            pending: RwLock::new(Vec::new()),
529            retry_delay: self.retry_delay,
530            max_retries: self.max_retries,
531        }))
532    }
533}
534
535struct FailbackClusterInvoker {
536    directory: Box<dyn Directory>,
537    pending: RwLock<Vec<PendingInvocation>>,
538    retry_delay: std::time::Duration,
539    max_retries: u32,
540}
541
542impl Node for FailbackClusterInvoker {
543    fn get_url(&self) -> &URL {
544        self.directory.get_url()
545    }
546
547    fn is_available(&self) -> bool {
548        true
549    }
550
551    fn destroy(&self) {}
552}
553
554#[async_trait]
555impl Invoker for FailbackClusterInvoker {
556    async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
557        let invokers = match self.directory.list(ctx).await {
558            Ok(invokers) => invokers,
559            Err(e) => {
560                tracing::warn!("failback: failed to list invokers: {e}");
561                self.record_pending(ctx);
562                return Ok(RPCResult::success(vec![]));
563            }
564        };
565
566        if invokers.is_empty() {
567            tracing::warn!("failback: no invokers available");
568            self.record_pending(ctx);
569            return Ok(RPCResult::success(vec![]));
570        }
571
572        match invokers[0].invoke(ctx).await {
573            Ok(result) if !result.is_error() => Ok(result),
574            Ok(result) => {
575                tracing::warn!(
576                    "failback: invocation failed with error {:?}, recording for retry",
577                    result.error
578                );
579                self.record_pending(ctx);
580                Ok(RPCResult::success(vec![]))
581            }
582            Err(e) => {
583                tracing::warn!("failback: invocation error: {e}, recording for retry");
584                self.record_pending(ctx);
585                Ok(RPCResult::success(vec![]))
586            }
587        }
588    }
589}
590
591impl FailbackClusterInvoker {
592    fn record_pending(&self, ctx: &InvocationContext) {
593        let pending = PendingInvocation {
594            method_name: ctx.method_name.clone(),
595            arguments: ctx.arguments.clone(),
596            parameter_types: ctx.parameter_types.clone(),
597            retry_count: 0,
598        };
599
600        let retry_delay = self.retry_delay;
601        let max_retries = self.max_retries;
602        let method_name = ctx.method_name.clone();
603
604        {
605            let mut queue = self.pending.write().unwrap();
606            queue.push(pending);
607        }
608
609        tracing::warn!(
610            "failback: scheduling retry in {:?} for method '{}'",
611            retry_delay,
612            method_name
613        );
614        tokio::spawn(async move {
615            tokio::time::sleep(retry_delay).await;
616            tracing::warn!(
617                "failback: retrying method '{}' (would attempt up to {} retries)",
618                method_name,
619                max_retries
620            );
621        });
622    }
623}
624
625// ============================================================================
626// ForkingCluster
627// ============================================================================
628
629/// `ForkingCluster` — invoke multiple invokers in parallel, return the first
630/// successful result. If all fail, return the last error.
631///
632/// Used for operations where low latency is critical and the caller can
633/// tolerate extra resource consumption from parallel calls.
634pub struct ForkingCluster {
635    forks: usize,
636}
637
638impl ForkingCluster {
639    #[must_use]
640    pub fn new() -> Self {
641        Self { forks: 2 }
642    }
643
644    #[must_use]
645    pub fn with_forks(mut self, forks: usize) -> Self {
646        self.forks = if forks == 0 { 1 } else { forks };
647        self
648    }
649}
650
651impl Default for ForkingCluster {
652    fn default() -> Self {
653        Self::new()
654    }
655}
656
657#[async_trait]
658impl Cluster for ForkingCluster {
659    async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
660        Ok(Box::new(ForkingClusterInvoker {
661            directory,
662            forks: self.forks,
663        }))
664    }
665}
666
667struct ForkingClusterInvoker {
668    directory: Box<dyn Directory>,
669    forks: usize,
670}
671
672impl Node for ForkingClusterInvoker {
673    fn get_url(&self) -> &URL {
674        self.directory.get_url()
675    }
676
677    fn is_available(&self) -> bool {
678        true
679    }
680
681    fn destroy(&self) {}
682}
683
684#[async_trait]
685impl Invoker for ForkingClusterInvoker {
686    async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
687        let invokers = self
688            .directory
689            .list(ctx)
690            .await
691            .map_err(|e| anyhow::anyhow!("{e}"))?;
692
693        if invokers.is_empty() {
694            return Err(anyhow::anyhow!("no invokers available"));
695        }
696
697        let selected: Vec<Arc<dyn Invoker>> = invokers.into_iter().take(self.forks).collect();
698
699        if selected.len() == 1 {
700            return selected[0].invoke(ctx).await;
701        }
702
703        let (tx, mut rx) =
704            tokio::sync::mpsc::channel::<(usize, Result<RPCResult, anyhow::Error>)>(selected.len());
705
706        for (i, invoker) in selected.into_iter().enumerate() {
707            let tx = tx.clone();
708            let mut fork_ctx = ctx.clone();
709            tokio::spawn(async move {
710                let result = invoker.invoke(&mut fork_ctx).await;
711                let _ = tx.send((i, result)).await;
712            });
713        }
714        drop(tx);
715
716        let total = {
717            let mut count = 0usize;
718            let mut last_error: Option<anyhow::Error> = None;
719            while let Some((_idx, result)) = rx.recv().await {
720                count += 1;
721                match result {
722                    Ok(r) if !r.is_error() => return Ok(r),
723                    Ok(r) => {
724                        last_error = Some(anyhow::anyhow!("{:?}", r.error));
725                    }
726                    Err(e) => {
727                        last_error = Some(e);
728                    }
729                }
730                if count >= self.forks {
731                    break;
732                }
733            }
734            last_error
735        };
736
737        Err(total.unwrap_or_else(|| anyhow::anyhow!("forking: all forks failed")))
738    }
739}
740
741// ============================================================================
742// BroadcastCluster
743// ============================================================================
744
745/// `BroadcastCluster` — invoke ALL invokers one by one. If any invoker fails,
746/// record the error but continue invoking the rest. After all invocations
747/// complete, if there were any errors, return the first error. Otherwise
748/// return the last successful result.
749///
750/// Used for operations that need to reach all providers, such as cache
751/// updates or notification broadcasts.
752pub struct BroadcastCluster;
753
754impl Default for BroadcastCluster {
755    fn default() -> Self {
756        Self
757    }
758}
759
760#[async_trait]
761impl Cluster for BroadcastCluster {
762    async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
763        Ok(Box::new(BroadcastClusterInvoker { directory }))
764    }
765}
766
767struct BroadcastClusterInvoker {
768    directory: Box<dyn Directory>,
769}
770
771impl Node for BroadcastClusterInvoker {
772    fn get_url(&self) -> &URL {
773        self.directory.get_url()
774    }
775
776    fn is_available(&self) -> bool {
777        true
778    }
779
780    fn destroy(&self) {}
781}
782
783#[async_trait]
784impl Invoker for BroadcastClusterInvoker {
785    async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
786        let invokers = self
787            .directory
788            .list(ctx)
789            .await
790            .map_err(|e| anyhow::anyhow!("{e}"))?;
791
792        if invokers.is_empty() {
793            return Err(anyhow::anyhow!("no invokers available"));
794        }
795
796        let mut first_error: Option<RPCError> = None;
797        let mut last_result: Option<RPCResult> = None;
798
799        for invoker in &invokers {
800            match invoker.invoke(ctx).await {
801                Ok(result) if !result.is_error() => {
802                    last_result = Some(result);
803                }
804                Ok(result) => {
805                    tracing::warn!(
806                        "broadcast: invoker {} returned error {:?}",
807                        invoker.get_url().get_address(),
808                        result.error
809                    );
810                    if first_error.is_none() {
811                        first_error = result.error.clone();
812                    }
813                }
814                Err(e) => {
815                    tracing::warn!(
816                        "broadcast: invoker {} failed: {e}",
817                        invoker.get_url().get_address()
818                    );
819                    if first_error.is_none() {
820                        first_error = Some(RPCError::ServerError(format!("{e}")));
821                    }
822                }
823            }
824        }
825
826        if let Some(err) = first_error {
827            return Err(anyhow::anyhow!(
828                "broadcast: one or more invokers failed, first error: {err:?}"
829            ));
830        }
831
832        last_result.ok_or_else(|| anyhow::anyhow!("broadcast: no results returned"))
833    }
834}
835
836// ============================================================================
837// AvailableCluster
838// ============================================================================
839
840/// `AvailableCluster` — invoke on the first available invoker.
841///
842/// Iterates through all invokers returned by the directory and invokes on the
843/// first one whose `is_available()` returns `true`. If none are available,
844/// returns an error.
845///
846/// This is the simplest cluster strategy with no retry or failover logic.
847pub struct AvailableCluster;
848
849impl Default for AvailableCluster {
850    fn default() -> Self {
851        Self
852    }
853}
854
855#[async_trait]
856impl Cluster for AvailableCluster {
857    async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
858        Ok(Box::new(AvailableClusterInvoker { directory }))
859    }
860}
861
862struct AvailableClusterInvoker {
863    directory: Box<dyn Directory>,
864}
865
866impl Node for AvailableClusterInvoker {
867    fn get_url(&self) -> &URL {
868        self.directory.get_url()
869    }
870
871    fn is_available(&self) -> bool {
872        true
873    }
874
875    fn destroy(&self) {}
876}
877
878#[async_trait]
879impl Invoker for AvailableClusterInvoker {
880    async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
881        let invokers = self
882            .directory
883            .list(ctx)
884            .await
885            .map_err(|e| anyhow::anyhow!("{e}"))?;
886
887        for invoker in &invokers {
888            if invoker.is_available() {
889                return invoker.invoke(ctx).await;
890            }
891        }
892
893        Err(anyhow::anyhow!(
894            "available: no available provider found among {} invokers",
895            invokers.len()
896        ))
897    }
898}
899
900// ============================================================================
901// MockCluster
902// ============================================================================
903
904/// `MockCluster` — returns mock results without calling downstream invokers.
905///
906/// Supports two modes:
907/// - **force-mock** (`force = true`): always returns the mock result, never
908///   calls the downstream invoker.
909/// - **fail-mock** (`force = false`): tries the downstream invoker first;
910///   if the call fails (returns an error or an `RPCResult` with an error),
911///   falls back to the mock result.
912///
913/// Mock behaviour can also be overridden per-invocation via attachments:
914/// - `mock` = `"force"` → force-mock for this call
915/// - `mock` = `"fail"` → fail-mock for this call
916/// - `mock.result` = `"<utf8-string>"` → mock result bytes for this call
917pub struct MockCluster {
918    #[allow(dead_code)]
919    force: bool,
920    #[allow(dead_code)]
921    mock_result: Option<Vec<u8>>,
922}
923
924impl MockCluster {
925    #[must_use]
926    pub fn new() -> Self {
927        Self {
928            force: false,
929            mock_result: None,
930        }
931    }
932
933    #[must_use]
934    pub fn with_force(mut self, force: bool) -> Self {
935        self.force = force;
936        self
937    }
938
939    #[must_use]
940    pub fn with_mock_result(mut self, result: Vec<u8>) -> Self {
941        self.mock_result = Some(result);
942        self
943    }
944}
945
946impl Default for MockCluster {
947    fn default() -> Self {
948        Self::new()
949    }
950}
951
952#[async_trait]
953impl Cluster for MockCluster {
954    async fn join(&self, directory: Box<dyn Directory>) -> Result<Box<dyn Invoker>, RPCError> {
955        Ok(Box::new(MockClusterInvoker {
956            directory,
957            force: self.force,
958            mock_result: self.mock_result.clone(),
959        }))
960    }
961}
962
963struct MockClusterInvoker {
964    directory: Box<dyn Directory>,
965    #[allow(dead_code)]
966    force: bool,
967    #[allow(dead_code)]
968    mock_result: Option<Vec<u8>>,
969}
970
971impl Node for MockClusterInvoker {
972    fn get_url(&self) -> &URL {
973        self.directory.get_url()
974    }
975
976    fn is_available(&self) -> bool {
977        true
978    }
979
980    fn destroy(&self) {}
981}
982
983#[async_trait]
984impl Invoker for MockClusterInvoker {
985    async fn invoke(&self, ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
986        // Determine effective force flag and mock result from attachments.
987        let attachment_force = ctx.attachments.get("mock").is_some_and(|v| v == "force");
988        let attachment_fail = ctx.attachments.get("mock").is_some_and(|v| v == "fail");
989        let attachment_mock_result = ctx
990            .attachments
991            .get("mock.result")
992            .map(|v| v.as_bytes().to_vec());
993
994        let force = attachment_force || (!attachment_fail && self.force);
995        let mock_result = attachment_mock_result.or_else(|| self.mock_result.clone());
996
997        if force {
998            // Force-mock: return mock result without calling downstream.
999            return Ok(RPCResult::success(mock_result.unwrap_or_default()));
1000        }
1001
1002        let invokers = match self.directory.list(ctx).await {
1003            Ok(invokers) if !invokers.is_empty() => invokers,
1004            _ => return Ok(RPCResult::success(mock_result.unwrap_or_default())),
1005        };
1006
1007        match invokers[0].invoke(ctx).await {
1008            Ok(result) if !result.is_error() => Ok(result),
1009            Ok(err_result) => match mock_result {
1010                Some(data) => Ok(RPCResult::success(data)),
1011                None => Ok(err_result),
1012            },
1013            Err(e) => match mock_result {
1014                Some(data) => Ok(RPCResult::success(data)),
1015                None => Err(e),
1016            },
1017        }
1018    }
1019}
1020
1021// ============================================================================
1022// Routers
1023// ============================================================================
1024
1025/// Condition-based router — filters invokers by matching rules against
1026/// invocation context parameters.
1027///
1028/// Rules follow the pattern `match_key=val => filter_key=val`:
1029/// - **Match side**: conditions that must ALL be true for the rule to activate
1030/// - **Filter side**: conditions applied to invoker URL params
1031pub struct ConditionRouter {
1032    match_rules: Vec<(String, String)>,
1033    filter_rules: Vec<(String, String)>,
1034}
1035
1036impl ConditionRouter {
1037    /// Parse a rule like `"region=beijing => env=gray"` or `"=> env=gray"`
1038    /// (always matches).
1039    #[must_use]
1040    pub fn parse(rule: &str) -> Option<Self> {
1041        let parts: Vec<&str> = rule.splitn(2, "=>").collect();
1042        let lhs = parts[0].trim();
1043        let rhs = parts.get(1).map_or("", |s| s.trim());
1044
1045        // If there is no `=>`, the entire string is filter rules.
1046        if parts.len() == 1 {
1047            return Some(Self {
1048                match_rules: Vec::new(),
1049                filter_rules: Self::parse_kv_pairs(lhs),
1050            });
1051        }
1052
1053        Some(Self {
1054            match_rules: Self::parse_kv_pairs(lhs),
1055            filter_rules: Self::parse_kv_pairs(rhs),
1056        })
1057    }
1058
1059    fn parse_kv_pairs(s: &str) -> Vec<(String, String)> {
1060        if s.is_empty() {
1061            return Vec::new();
1062        }
1063        s.split(',')
1064            .filter_map(|kv| {
1065                let mut it = kv.splitn(2, '=');
1066                let k = it.next()?.trim();
1067                let v = it.next()?.trim();
1068                if k.is_empty() {
1069                    None
1070                } else {
1071                    Some((k.to_string(), v.to_string()))
1072                }
1073            })
1074            .collect()
1075    }
1076
1077    /// Check whether the router applies to the given invocation.
1078    #[must_use]
1079    pub fn matches_invocation(&self, ctx: &InvocationContext) -> bool {
1080        if self.match_rules.is_empty() {
1081            return true;
1082        }
1083        self.match_rules
1084            .iter()
1085            .all(|(k, v)| ctx.attachments.get(k).is_some_and(|val| val == v))
1086    }
1087
1088    /// Return the indices of invokers whose URL params satisfy all filter rules.
1089    #[must_use]
1090    pub fn filter_invokers(&self, invokers: &[Arc<dyn Invoker>]) -> Vec<usize> {
1091        invokers
1092            .iter()
1093            .enumerate()
1094            .filter(|(_, inv)| {
1095                self.filter_rules
1096                    .iter()
1097                    .all(|(k, v)| inv.get_url().get_param(k).is_some_and(|val| val == v))
1098            })
1099            .map(|(i, _)| i)
1100            .collect()
1101    }
1102}
1103
1104/// Tag-based router for traffic coloring.
1105///
1106/// Reads the `dubbo.tag` attachment from the invocation context and routes
1107/// to invokers whose `tag` URL parameter matches. Falls back to untagged
1108/// invokers when no tagged invoker matches.
1109pub struct TagRouter {
1110    tag_key: String,
1111}
1112
1113impl TagRouter {
1114    #[must_use]
1115    pub fn new() -> Self {
1116        Self {
1117            tag_key: "dubbo.tag".to_string(),
1118        }
1119    }
1120
1121    #[must_use]
1122    pub fn with_tag_key(mut self, key: impl Into<String>) -> Self {
1123        self.tag_key = key.into();
1124        self
1125    }
1126
1127    /// Filter invokers by tag. Returns the indices of matching invokers.
1128    ///
1129    /// - If no tag is requested, all invokers are returned.
1130    /// - If a tag is requested, only invokers with a matching `tag` param
1131    ///   are returned.
1132    /// - If no tagged invoker matches, falls back to invokers without a tag.
1133    #[must_use]
1134    pub fn route(&self, invokers: &[Arc<dyn Invoker>], ctx: &InvocationContext) -> Vec<usize> {
1135        let requested_tag = ctx.attachments.get(&self.tag_key);
1136
1137        let Some(tag) = requested_tag else {
1138            return (0..invokers.len()).collect();
1139        };
1140
1141        let tagged: Vec<usize> = invokers
1142            .iter()
1143            .enumerate()
1144            .filter(|(_, inv)| inv.get_url().get_param("tag").is_some_and(|t| t == tag))
1145            .map(|(i, _)| i)
1146            .collect();
1147
1148        if !tagged.is_empty() {
1149            return tagged;
1150        }
1151
1152        invokers
1153            .iter()
1154            .enumerate()
1155            .filter(|(_, inv)| inv.get_url().get_param("tag").is_none())
1156            .map(|(i, _)| i)
1157            .collect()
1158    }
1159}
1160
1161impl Default for TagRouter {
1162    fn default() -> Self {
1163        Self::new()
1164    }
1165}
1166
1167/// Script-based router powered by the rhai scripting engine.
1168///
1169/// Evaluates routing rules written in rhai script to filter invokers.
1170/// The script has access to helper functions for querying invoker
1171/// properties and invocation context, and must return an array of
1172/// selected invoker indices.
1173///
1174/// # Script API
1175///
1176/// | Function | Return | Description |
1177/// |----------|--------|-------------|
1178/// | `invoker_count()` | `i64` | Number of invokers |
1179/// | `invoker_ip(i)` | `String` | IP of invoker at index `i` |
1180/// | `invoker_has_param(i, key)` | `bool` | Check URL param existence |
1181/// | `invoker_get_param(i, key)` | `String` | Get URL param value |
1182/// | `method_name()` | `String` | Current method name |
1183/// | `has_attachment(key)` | `bool` | Check attachment existence |
1184/// | `get_attachment(key)` | `String` | Get attachment value |
1185///
1186/// The script must return an `Array` of `i64` indices.
1187pub struct ScriptRouter {
1188    script: String,
1189    compiled: rhai::AST,
1190}
1191
1192impl ScriptRouter {
1193    /// Compile a rhai script and create a new `ScriptRouter`.
1194    ///
1195    /// # Errors
1196    ///
1197    /// Returns `rhai::EvalAltResult` if the script fails to compile.
1198    pub fn new(script: &str) -> Result<Self, Box<rhai::EvalAltResult>> {
1199        let mut engine = rhai::Engine::new();
1200        engine.set_max_expr_depths(64, 64);
1201        engine.set_max_operations(1000);
1202        engine.set_max_string_size(1024);
1203        engine.set_max_array_size(256);
1204
1205        let compiled = engine.compile(script)?;
1206        Ok(Self {
1207            script: script.to_string(),
1208            compiled,
1209        })
1210    }
1211
1212    #[must_use]
1213    #[allow(
1214        clippy::cast_possible_wrap,
1215        clippy::cast_possible_truncation,
1216        clippy::cast_sign_loss
1217    )]
1218    pub fn route(&self, invokers: &[Arc<dyn Invoker>], ctx: &InvocationContext) -> Vec<usize> {
1219        let mut engine = rhai::Engine::new();
1220        engine.set_max_expr_depths(64, 64);
1221        engine.set_max_operations(1000);
1222        engine.set_max_string_size(1024);
1223        engine.set_max_array_size(256);
1224
1225        let invokers_len = invokers.len();
1226        let invoker_arcs: Vec<Arc<dyn Invoker>> = invokers.to_vec();
1227        let method = ctx.method_name.clone();
1228        let attachments: std::collections::HashMap<String, String> = ctx.attachments.clone();
1229
1230        engine.register_fn("invoker_count", move || -> i64 { invokers_len as i64 });
1231
1232        let arcs = invoker_arcs.clone();
1233        engine.register_fn("invoker_ip", move |index: i64| -> String {
1234            let idx = index as usize;
1235            arcs.get(idx)
1236                .map(|a| a.get_url().ip.clone())
1237                .unwrap_or_default()
1238        });
1239
1240        let arcs = invoker_arcs.clone();
1241        engine.register_fn(
1242            "invoker_has_param",
1243            move |index: i64, key: String| -> bool {
1244                let idx = index as usize;
1245                arcs.get(idx)
1246                    .is_some_and(|a| a.get_url().get_param(&key).is_some())
1247            },
1248        );
1249
1250        let arcs = invoker_arcs;
1251        engine.register_fn(
1252            "invoker_get_param",
1253            move |index: i64, key: String| -> String {
1254                let idx = index as usize;
1255                arcs.get(idx)
1256                    .and_then(|a| a.get_url().get_param(&key).cloned())
1257                    .unwrap_or_default()
1258            },
1259        );
1260
1261        engine.register_fn("method_name", move || -> String { method.clone() });
1262
1263        let att = attachments.clone();
1264        engine.register_fn("has_attachment", move |key: String| -> bool {
1265            att.contains_key(&key)
1266        });
1267
1268        engine.register_fn("get_attachment", move |key: String| -> String {
1269            attachments.get(&key).cloned().unwrap_or_default()
1270        });
1271
1272        let mut scope = rhai::Scope::new();
1273        let result = engine.eval_ast_with_scope::<rhai::Dynamic>(&mut scope, &self.compiled);
1274
1275        match result {
1276            Ok(dynamic) => {
1277                if let Some(arr) = dynamic.clone().try_cast::<rhai::Array>() {
1278                    arr.into_iter()
1279                        .filter_map(|v| v.try_cast::<i64>().map(|i| i as usize))
1280                        .filter(|&i| i < invokers_len)
1281                        .collect()
1282                } else {
1283                    (0..invokers_len).collect()
1284                }
1285            }
1286            Err(_) => (0..invokers_len).collect(),
1287        }
1288    }
1289
1290    #[must_use]
1291    pub fn script(&self) -> &str {
1292        &self.script
1293    }
1294}
1295
1296/// A chain of routers that filters invokers sequentially.
1297///
1298/// Each router receives the output of the previous one. If any router
1299/// removes all invokers, the chain returns an empty list.
1300pub struct RouterChain {
1301    condition_routers: Vec<ConditionRouter>,
1302    tag_router: Option<TagRouter>,
1303    script_router: Option<ScriptRouter>,
1304}
1305
1306impl RouterChain {
1307    #[must_use]
1308    pub fn new() -> Self {
1309        Self {
1310            condition_routers: Vec::new(),
1311            tag_router: None,
1312            script_router: None,
1313        }
1314    }
1315
1316    pub fn add_condition_router(&mut self, router: ConditionRouter) {
1317        self.condition_routers.push(router);
1318    }
1319
1320    pub fn set_tag_router(&mut self, router: TagRouter) {
1321        self.tag_router = Some(router);
1322    }
1323
1324    #[must_use]
1325    pub fn with_condition_router(mut self, router: ConditionRouter) -> Self {
1326        self.condition_routers.push(router);
1327        self
1328    }
1329
1330    #[must_use]
1331    pub fn with_tag_router(mut self, router: TagRouter) -> Self {
1332        self.tag_router = Some(router);
1333        self
1334    }
1335
1336    #[must_use]
1337    pub fn with_script_router(mut self, router: ScriptRouter) -> Self {
1338        self.script_router = Some(router);
1339        self
1340    }
1341
1342    pub fn set_script_router(&mut self, router: ScriptRouter) {
1343        self.script_router = Some(router);
1344    }
1345
1346    /// Route invokers through all configured routers.
1347    ///
1348    /// Returns the indices of invokers that pass every routing rule.
1349    #[must_use]
1350    pub fn route(&self, invokers: &[Arc<dyn Invoker>], ctx: &InvocationContext) -> Vec<usize> {
1351        let mut current: Vec<usize> = (0..invokers.len()).collect();
1352
1353        for cr in &self.condition_routers {
1354            if cr.matches_invocation(ctx) {
1355                let filtered = cr.filter_invokers(invokers);
1356                current.retain(|i| filtered.contains(i));
1357                if current.is_empty() {
1358                    return current;
1359                }
1360            }
1361        }
1362
1363        if let Some(ref tr) = self.tag_router {
1364            let filtered = tr.route(invokers, ctx);
1365            current.retain(|i| filtered.contains(i));
1366        }
1367
1368        if let Some(ref sr) = self.script_router {
1369            let filtered = sr.route(invokers, ctx);
1370            current.retain(|i| filtered.contains(i));
1371        }
1372
1373        current
1374    }
1375}
1376
1377impl Default for RouterChain {
1378    fn default() -> Self {
1379        Self::new()
1380    }
1381}
1382
1383#[cfg(test)]
1384mod tests {
1385    use super::*;
1386    use std::sync::Arc;
1387
1388    fn make_url(host: &str, port: &str, path: &str) -> URL {
1389        let mut url = URL::new("tri", path);
1390        url.ip = host.to_string();
1391        url.port = port.to_string();
1392        url
1393    }
1394
1395    #[test]
1396    fn test_static_directory_empty_returns_error() {
1397        let dir = StaticDirectory::new(URL::new("tri", "/com.example.Service"));
1398        let _ctx = InvocationContext::new("sayHello", URL::new("tri", "/com.example.Service"));
1399
1400        assert_eq!(dir.invoker_count(), 0);
1401        assert_eq!(dir.get_url().path, "/com.example.Service");
1402    }
1403
1404    #[test]
1405    fn test_registry_directory_new_is_empty() {
1406        let dir = RegistryDirectory::new(URL::new("tri", "/com.example.Service"));
1407        assert_eq!(dir.invoker_count(), 0);
1408        assert_eq!(dir.get_url().path, "/com.example.Service");
1409    }
1410
1411    #[test]
1412    fn test_registry_directory_refresh_invokers() {
1413        let dir = RegistryDirectory::new(URL::new("tri", "/com.example.Service"));
1414        let providers = vec![
1415            make_url("192.168.1.1", "50051", "/com.example.Service"),
1416            make_url("192.168.1.2", "50051", "/com.example.Service"),
1417        ];
1418        dir.refresh_invokers(&providers);
1419        assert_eq!(dir.invoker_count(), 2);
1420    }
1421
1422    #[test]
1423    fn test_registry_directory_notify_add() {
1424        let dir = Arc::new(RegistryDirectory::new(URL::new(
1425            "tri",
1426            "/com.example.Service",
1427        )));
1428        let providers = vec![make_url("192.168.1.1", "50051", "/com.example.Service")];
1429
1430        dir.refresh_invokers(&providers);
1431        assert_eq!(dir.invoker_count(), 1);
1432    }
1433
1434    #[test]
1435    fn test_registry_directory_notify_remove() {
1436        let dir = Arc::new(RegistryDirectory::new(URL::new(
1437            "tri",
1438            "/com.example.Service",
1439        )));
1440        let initial = vec![
1441            make_url("192.168.1.1", "50051", "/com.example.Service"),
1442            make_url("192.168.1.2", "50051", "/com.example.Service"),
1443        ];
1444        dir.refresh_invokers(&initial);
1445        assert_eq!(dir.invoker_count(), 2);
1446
1447        let after_remove = vec![make_url("192.168.1.1", "50051", "/com.example.Service")];
1448        dir.refresh_invokers(&after_remove);
1449        assert_eq!(dir.invoker_count(), 1);
1450    }
1451
1452    struct MockInvoker {
1453        url: URL,
1454        succeed: bool,
1455        call_count: std::sync::atomic::AtomicUsize,
1456    }
1457
1458    impl MockInvoker {
1459        fn new(url: URL, succeed: bool) -> Self {
1460            Self {
1461                url,
1462                succeed,
1463                call_count: std::sync::atomic::AtomicUsize::new(0),
1464            }
1465        }
1466    }
1467
1468    impl Node for MockInvoker {
1469        fn get_url(&self) -> &URL {
1470            &self.url
1471        }
1472        fn is_available(&self) -> bool {
1473            true
1474        }
1475        fn destroy(&self) {}
1476    }
1477
1478    #[async_trait]
1479    impl Invoker for MockInvoker {
1480        async fn invoke(&self, _ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
1481            self.call_count
1482                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
1483            if self.succeed {
1484                Ok(RPCResult::success(b"mock_response".to_vec()))
1485            } else {
1486                Ok(RPCResult::from_error(RPCError::ServerError(
1487                    "mock failure".into(),
1488                )))
1489            }
1490        }
1491    }
1492
1493    struct MockDirectory {
1494        url: URL,
1495        invokers: Vec<Arc<dyn Invoker>>,
1496    }
1497
1498    impl MockDirectory {
1499        fn new(invokers: Vec<Arc<dyn Invoker>>) -> Self {
1500            Self {
1501                url: URL::new("tri", "/mock"),
1502                invokers,
1503            }
1504        }
1505    }
1506
1507    #[async_trait]
1508    impl Directory for MockDirectory {
1509        async fn list(&self, _ctx: &InvocationContext) -> Result<Vec<Arc<dyn Invoker>>, RPCError> {
1510            if self.invokers.is_empty() {
1511                return Err(RPCError::ServiceNotFound("no invokers".into()));
1512            }
1513            Ok(self.invokers.iter().map(Arc::clone).collect())
1514        }
1515
1516        fn get_url(&self) -> &URL {
1517            &self.url
1518        }
1519    }
1520
1521    #[tokio::test]
1522    async fn test_failfast_cluster_success() {
1523        let invoker = Arc::new(MockInvoker::new(
1524            make_url("192.168.1.1", "50051", "/svc"),
1525            true,
1526        ));
1527        let directory = Box::new(MockDirectory::new(vec![invoker]));
1528        let cluster = FailfastCluster;
1529        let _cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1530    }
1531
1532    #[tokio::test]
1533    async fn test_failfast_cluster_empty_directory() {
1534        let directory = Box::new(MockDirectory::new(vec![]));
1535        let cluster = FailfastCluster;
1536        let _cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1537    }
1538
1539    #[tokio::test]
1540    async fn test_failover_cluster_creation() {
1541        let invokers: Vec<Arc<dyn Invoker>> = (0..3)
1542            .map(|i| {
1543                Arc::new(MockInvoker::new(
1544                    make_url("192.168.1.1", &format!("5005{i}"), "/svc"),
1545                    true,
1546                )) as Arc<dyn Invoker>
1547            })
1548            .collect();
1549        let directory = Box::new(MockDirectory::new(invokers));
1550        let cluster = FailoverCluster::new().with_retries(3);
1551        let _cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1552    }
1553
1554    #[tokio::test]
1555    async fn test_failover_cluster_default_retries() {
1556        let cluster = FailoverCluster::new();
1557        assert_eq!(cluster.retries, 2);
1558    }
1559
1560    #[tokio::test]
1561    async fn test_provider_invoker_invoke_returns_error() {
1562        let provider_url = make_url("192.168.1.1", "50051", "/com.example.Service");
1563        let invoker = ProviderInvoker {
1564            provider_url: provider_url.clone(),
1565        };
1566        let mut ctx = InvocationContext::new("sayHello", provider_url);
1567        let result = invoker.invoke(&mut ctx).await;
1568        assert!(result.is_err());
1569        let err_msg = result.unwrap_err().to_string();
1570        assert!(
1571            err_msg.contains("no protocol invoker"),
1572            "expected message about missing invoker factory, got: {err_msg}"
1573        );
1574    }
1575
1576    #[tokio::test]
1577    async fn test_static_directory_list_filters_unavailable() {
1578        use std::sync::atomic::{AtomicBool, Ordering};
1579
1580        struct AvailabilityInvoker {
1581            url: URL,
1582            available: AtomicBool,
1583        }
1584
1585        impl Node for AvailabilityInvoker {
1586            fn get_url(&self) -> &URL {
1587                &self.url
1588            }
1589            fn is_available(&self) -> bool {
1590                self.available.load(Ordering::SeqCst)
1591            }
1592            fn destroy(&self) {}
1593        }
1594
1595        #[async_trait]
1596        impl Invoker for AvailabilityInvoker {
1597            async fn invoke(
1598                &self,
1599                _ctx: &mut InvocationContext,
1600            ) -> Result<RPCResult, anyhow::Error> {
1601                Ok(RPCResult::success(b"ok".to_vec()))
1602            }
1603        }
1604
1605        let dir = StaticDirectory::new(URL::new("tri", "/com.example.Service"));
1606
1607        let inv1 = Arc::new(AvailabilityInvoker {
1608            url: make_url("192.168.1.1", "50051", "/svc"),
1609            available: AtomicBool::new(true),
1610        });
1611        let inv2 = Arc::new(AvailabilityInvoker {
1612            url: make_url("192.168.1.2", "50051", "/svc"),
1613            available: AtomicBool::new(false),
1614        });
1615        let inv3 = Arc::new(AvailabilityInvoker {
1616            url: make_url("192.168.1.3", "50051", "/svc"),
1617            available: AtomicBool::new(true),
1618        });
1619
1620        dir.add_invoker(inv1);
1621        dir.add_invoker(inv2);
1622        dir.add_invoker(inv3);
1623
1624        assert_eq!(dir.invoker_count(), 3);
1625
1626        let ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1627        let available = dir.list(&ctx).await.expect("list should succeed");
1628        assert_eq!(
1629            available.len(),
1630            2,
1631            "only available invokers should be returned"
1632        );
1633    }
1634
1635    #[test]
1636    fn test_registry_directory_with_invoker_factory() {
1637        let dir = RegistryDirectory::new(URL::new("tri", "/com.example.Service"))
1638            .with_invoker_factory(|url| {
1639                let invoker: Box<dyn Invoker> = Box::new(MockInvoker::new(url.clone(), true));
1640                Ok(invoker)
1641            });
1642        let providers = vec![
1643            make_url("192.168.1.1", "50051", "/com.example.Service"),
1644            make_url("192.168.1.2", "50051", "/com.example.Service"),
1645            make_url("192.168.1.3", "50051", "/com.example.Service"),
1646        ];
1647        dir.refresh_invokers(&providers);
1648        assert_eq!(dir.invoker_count(), 3);
1649    }
1650
1651    #[test]
1652    fn test_failover_cluster_retries_count() {
1653        let cluster = FailoverCluster::new().with_retries(5);
1654        assert_eq!(cluster.retries, 5);
1655    }
1656
1657    #[tokio::test]
1658    async fn test_failfast_cluster_join_with_empty_directory() {
1659        let directory = Box::new(MockDirectory::new(vec![]));
1660        let cluster = FailfastCluster;
1661        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1662
1663        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1664        let result = cluster_invoker.invoke(&mut ctx).await;
1665        assert!(result.is_err(), "invoke on empty directory should fail");
1666    }
1667
1668    #[tokio::test]
1669    async fn test_failover_cluster_invoker_retries_on_failure() {
1670        let inv1 = Arc::new(MockInvoker::new(
1671            make_url("192.168.1.1", "50051", "/svc"),
1672            false,
1673        ));
1674        let inv2 = Arc::new(MockInvoker::new(
1675            make_url("192.168.1.2", "50051", "/svc"),
1676            false,
1677        ));
1678        let inv3 = Arc::new(MockInvoker::new(
1679            make_url("192.168.1.3", "50051", "/svc"),
1680            false,
1681        ));
1682
1683        let call1 = Arc::clone(&inv1);
1684        let call2 = Arc::clone(&inv2);
1685        let call3 = Arc::clone(&inv3);
1686
1687        let directory = Box::new(MockDirectory::new(vec![inv1, inv2, inv3]));
1688        let cluster = FailoverCluster::new().with_retries(0);
1689        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1690
1691        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1692        let result = cluster_invoker.invoke(&mut ctx).await;
1693        assert!(result.is_err(), "all invokers fail, should return error");
1694
1695        // With retries=0, there is 1 attempt iterating all 3 invokers
1696        assert_eq!(
1697            call1.call_count.load(std::sync::atomic::Ordering::SeqCst),
1698            1,
1699            "invoker 1 should be called once"
1700        );
1701        assert_eq!(
1702            call2.call_count.load(std::sync::atomic::Ordering::SeqCst),
1703            1,
1704            "invoker 2 should be called once"
1705        );
1706        assert_eq!(
1707            call3.call_count.load(std::sync::atomic::Ordering::SeqCst),
1708            1,
1709            "invoker 3 should be called once"
1710        );
1711    }
1712
1713    // =========================================================================
1714    // FailsafeCluster tests
1715    // =========================================================================
1716
1717    #[tokio::test]
1718    async fn test_failsafe_cluster_all_success() {
1719        let invoker = Arc::new(MockInvoker::new(
1720            make_url("192.168.1.1", "50051", "/svc"),
1721            true,
1722        ));
1723        let directory = Box::new(MockDirectory::new(vec![invoker]));
1724        let cluster = FailsafeCluster;
1725        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1726
1727        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1728        let result = cluster_invoker.invoke(&mut ctx).await;
1729        assert!(result.is_ok(), "failsafe should always return Ok");
1730        let rpc = result.unwrap();
1731        assert!(
1732            !rpc.is_error(),
1733            "successful invocation should not have error"
1734        );
1735        assert_eq!(rpc.value, Some(b"mock_response".to_vec()));
1736    }
1737
1738    #[tokio::test]
1739    async fn test_failsafe_cluster_all_failing() {
1740        let invoker = Arc::new(MockInvoker::new(
1741            make_url("192.168.1.1", "50051", "/svc"),
1742            false,
1743        ));
1744        let directory = Box::new(MockDirectory::new(vec![invoker]));
1745        let cluster = FailsafeCluster;
1746        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1747
1748        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1749        let result = cluster_invoker.invoke(&mut ctx).await;
1750        assert!(result.is_ok(), "failsafe should swallow errors");
1751        let rpc = result.unwrap();
1752        assert!(!rpc.is_error(), "failsafe returns empty success on failure");
1753        assert_eq!(rpc.value, Some(vec![]));
1754    }
1755
1756    #[tokio::test]
1757    async fn test_failsafe_cluster_empty_directory() {
1758        let directory = Box::new(MockDirectory::new(vec![]));
1759        let cluster = FailsafeCluster;
1760        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1761
1762        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1763        let result = cluster_invoker.invoke(&mut ctx).await;
1764        assert!(result.is_ok(), "failsafe should handle empty directory");
1765        let rpc = result.unwrap();
1766        assert!(!rpc.is_error());
1767        assert_eq!(rpc.value, Some(vec![]));
1768    }
1769
1770    #[tokio::test]
1771    async fn test_failsafe_cluster_default() {
1772        let cluster = FailsafeCluster;
1773        let _ = cluster;
1774    }
1775
1776    // =========================================================================
1777    // FailbackCluster tests
1778    // =========================================================================
1779
1780    #[tokio::test]
1781    async fn test_failback_cluster_all_success() {
1782        let invoker = Arc::new(MockInvoker::new(
1783            make_url("192.168.1.1", "50051", "/svc"),
1784            true,
1785        ));
1786        let directory = Box::new(MockDirectory::new(vec![invoker]));
1787        let cluster = FailbackCluster::new();
1788        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1789
1790        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1791        let result = cluster_invoker.invoke(&mut ctx).await;
1792        assert!(result.is_ok());
1793        let rpc = result.unwrap();
1794        assert!(!rpc.is_error());
1795        assert_eq!(rpc.value, Some(b"mock_response".to_vec()));
1796    }
1797
1798    #[tokio::test]
1799    async fn test_failback_cluster_failure_returns_empty_success() {
1800        let invoker = Arc::new(MockInvoker::new(
1801            make_url("192.168.1.1", "50051", "/svc"),
1802            false,
1803        ));
1804        let directory = Box::new(MockDirectory::new(vec![invoker]));
1805        let cluster = FailbackCluster::new();
1806        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1807
1808        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1809        let result = cluster_invoker.invoke(&mut ctx).await;
1810        assert!(result.is_ok(), "failback should return Ok on failure");
1811        let rpc = result.unwrap();
1812        assert!(!rpc.is_error(), "failback returns empty success");
1813        assert_eq!(rpc.value, Some(vec![]));
1814    }
1815
1816    #[tokio::test]
1817    async fn test_failback_cluster_empty_directory() {
1818        let directory = Box::new(MockDirectory::new(vec![]));
1819        let cluster = FailbackCluster::new();
1820        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1821
1822        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1823        let result = cluster_invoker.invoke(&mut ctx).await;
1824        assert!(result.is_ok());
1825        let rpc = result.unwrap();
1826        assert!(!rpc.is_error());
1827        assert_eq!(rpc.value, Some(vec![]));
1828    }
1829
1830    #[test]
1831    fn test_failback_cluster_default_config() {
1832        let cluster = FailbackCluster::new();
1833        assert_eq!(cluster.max_retries, 3);
1834        assert_eq!(cluster.retry_delay, std::time::Duration::from_secs(5));
1835    }
1836
1837    #[test]
1838    fn test_failback_cluster_custom_config() {
1839        let cluster = FailbackCluster::new()
1840            .with_retry_delay(std::time::Duration::from_secs(10))
1841            .with_max_retries(5);
1842        assert_eq!(cluster.retry_delay, std::time::Duration::from_secs(10));
1843        assert_eq!(cluster.max_retries, 5);
1844    }
1845
1846    // =========================================================================
1847    // ForkingCluster tests
1848    // =========================================================================
1849
1850    #[tokio::test]
1851    async fn test_forking_cluster_all_success() {
1852        let inv1 = Arc::new(MockInvoker::new(
1853            make_url("192.168.1.1", "50051", "/svc"),
1854            true,
1855        ));
1856        let inv2 = Arc::new(MockInvoker::new(
1857            make_url("192.168.1.2", "50051", "/svc"),
1858            true,
1859        ));
1860        let directory = Box::new(MockDirectory::new(vec![inv1, inv2]));
1861        let cluster = ForkingCluster::new();
1862        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1863
1864        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1865        let result = cluster_invoker.invoke(&mut ctx).await;
1866        assert!(result.is_ok());
1867        let rpc = result.unwrap();
1868        assert!(!rpc.is_error());
1869        assert_eq!(rpc.value, Some(b"mock_response".to_vec()));
1870    }
1871
1872    #[tokio::test]
1873    async fn test_forking_cluster_all_fail() {
1874        let inv1 = Arc::new(MockInvoker::new(
1875            make_url("192.168.1.1", "50051", "/svc"),
1876            false,
1877        ));
1878        let inv2 = Arc::new(MockInvoker::new(
1879            make_url("192.168.1.2", "50051", "/svc"),
1880            false,
1881        ));
1882        let directory = Box::new(MockDirectory::new(vec![inv1, inv2]));
1883        let cluster = ForkingCluster::new();
1884        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1885
1886        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1887        let result = cluster_invoker.invoke(&mut ctx).await;
1888        assert!(result.is_err(), "all forks failed should return error");
1889    }
1890
1891    #[tokio::test]
1892    async fn test_forking_cluster_mixed() {
1893        let inv1 = Arc::new(MockInvoker::new(
1894            make_url("192.168.1.1", "50051", "/svc"),
1895            false,
1896        ));
1897        let inv2 = Arc::new(MockInvoker::new(
1898            make_url("192.168.1.2", "50051", "/svc"),
1899            true,
1900        ));
1901        let directory = Box::new(MockDirectory::new(vec![inv1, inv2]));
1902        let cluster = ForkingCluster::new();
1903        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1904
1905        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1906        let result = cluster_invoker.invoke(&mut ctx).await;
1907        assert!(
1908            result.is_ok(),
1909            "should succeed with at least one successful fork"
1910        );
1911        let rpc = result.unwrap();
1912        assert!(!rpc.is_error());
1913    }
1914
1915    #[tokio::test]
1916    async fn test_forking_cluster_empty_directory() {
1917        let directory = Box::new(MockDirectory::new(vec![]));
1918        let cluster = ForkingCluster::new();
1919        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1920
1921        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1922        let result = cluster_invoker.invoke(&mut ctx).await;
1923        assert!(result.is_err(), "empty directory should return error");
1924    }
1925
1926    #[test]
1927    fn test_forking_cluster_default_forks() {
1928        let cluster = ForkingCluster::new();
1929        assert_eq!(cluster.forks, 2);
1930    }
1931
1932    #[test]
1933    fn test_forking_cluster_custom_forks() {
1934        let cluster = ForkingCluster::new().with_forks(5);
1935        assert_eq!(cluster.forks, 5);
1936    }
1937
1938    #[test]
1939    fn test_forking_cluster_zero_forks_clamped() {
1940        let cluster = ForkingCluster::new().with_forks(0);
1941        assert_eq!(cluster.forks, 1, "zero forks should be clamped to 1");
1942    }
1943
1944    // =========================================================================
1945    // BroadcastCluster tests
1946    // =========================================================================
1947
1948    #[tokio::test]
1949    async fn test_broadcast_cluster_all_success() {
1950        let inv1 = Arc::new(MockInvoker::new(
1951            make_url("192.168.1.1", "50051", "/svc"),
1952            true,
1953        ));
1954        let inv2 = Arc::new(MockInvoker::new(
1955            make_url("192.168.1.2", "50051", "/svc"),
1956            true,
1957        ));
1958        let inv3 = Arc::new(MockInvoker::new(
1959            make_url("192.168.1.3", "50051", "/svc"),
1960            true,
1961        ));
1962        let directory = Box::new(MockDirectory::new(vec![inv1, inv2, inv3]));
1963        let cluster = BroadcastCluster;
1964        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1965
1966        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1967        let result = cluster_invoker.invoke(&mut ctx).await;
1968        assert!(result.is_ok());
1969        let rpc = result.unwrap();
1970        assert!(!rpc.is_error());
1971    }
1972
1973    #[tokio::test]
1974    async fn test_broadcast_cluster_all_fail() {
1975        let inv1 = Arc::new(MockInvoker::new(
1976            make_url("192.168.1.1", "50051", "/svc"),
1977            false,
1978        ));
1979        let inv2 = Arc::new(MockInvoker::new(
1980            make_url("192.168.1.2", "50051", "/svc"),
1981            false,
1982        ));
1983        let directory = Box::new(MockDirectory::new(vec![inv1, inv2]));
1984        let cluster = BroadcastCluster;
1985        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
1986
1987        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
1988        let result = cluster_invoker.invoke(&mut ctx).await;
1989        assert!(result.is_err(), "all invokers failing should return error");
1990    }
1991
1992    #[tokio::test]
1993    async fn test_broadcast_cluster_mixed() {
1994        let inv1 = Arc::new(MockInvoker::new(
1995            make_url("192.168.1.1", "50051", "/svc"),
1996            true,
1997        ));
1998        let inv2 = Arc::new(MockInvoker::new(
1999            make_url("192.168.1.2", "50051", "/svc"),
2000            false,
2001        ));
2002        let inv3 = Arc::new(MockInvoker::new(
2003            make_url("192.168.1.3", "50051", "/svc"),
2004            true,
2005        ));
2006        let directory = Box::new(MockDirectory::new(vec![inv1, inv2, inv3]));
2007        let cluster = BroadcastCluster;
2008        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2009
2010        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2011        let result = cluster_invoker.invoke(&mut ctx).await;
2012        assert!(
2013            result.is_err(),
2014            "any failure in broadcast should return error"
2015        );
2016    }
2017
2018    #[tokio::test]
2019    async fn test_broadcast_cluster_empty_directory() {
2020        let directory = Box::new(MockDirectory::new(vec![]));
2021        let cluster = BroadcastCluster;
2022        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2023
2024        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2025        let result = cluster_invoker.invoke(&mut ctx).await;
2026        assert!(result.is_err(), "empty directory should return error");
2027    }
2028
2029    #[tokio::test]
2030    async fn test_broadcast_cluster_default() {
2031        let cluster = BroadcastCluster;
2032        let _ = cluster;
2033    }
2034
2035    // =========================================================================
2036    // AvailableCluster tests
2037    // =========================================================================
2038
2039    struct AvailableMockInvoker {
2040        url: URL,
2041        available: bool,
2042        succeed: bool,
2043    }
2044
2045    impl AvailableMockInvoker {
2046        fn new(url: URL, available: bool, succeed: bool) -> Self {
2047            Self {
2048                url,
2049                available,
2050                succeed,
2051            }
2052        }
2053    }
2054
2055    impl Node for AvailableMockInvoker {
2056        fn get_url(&self) -> &URL {
2057            &self.url
2058        }
2059        fn is_available(&self) -> bool {
2060            self.available
2061        }
2062        fn destroy(&self) {}
2063    }
2064
2065    #[async_trait]
2066    impl Invoker for AvailableMockInvoker {
2067        async fn invoke(&self, _ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
2068            if self.succeed {
2069                Ok(RPCResult::success(self.url.ip.as_bytes().to_vec()))
2070            } else {
2071                Ok(RPCResult::from_error(RPCError::ServerError(
2072                    "mock failure".into(),
2073                )))
2074            }
2075        }
2076    }
2077
2078    #[tokio::test]
2079    async fn test_available_cluster_first_available() {
2080        let inv1 = Arc::new(AvailableMockInvoker::new(
2081            make_url("192.168.1.1", "50051", "/svc"),
2082            false,
2083            true,
2084        ));
2085        let inv2 = Arc::new(AvailableMockInvoker::new(
2086            make_url("192.168.1.2", "50051", "/svc"),
2087            false,
2088            true,
2089        ));
2090        let inv3 = Arc::new(AvailableMockInvoker::new(
2091            make_url("192.168.1.3", "50051", "/svc"),
2092            true,
2093            true,
2094        ));
2095
2096        let directory = Box::new(MockDirectory::new(vec![inv1, inv2, inv3]));
2097        let cluster = AvailableCluster;
2098        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2099
2100        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2101        let result = cluster_invoker.invoke(&mut ctx).await;
2102        assert!(result.is_ok());
2103        let rpc = result.unwrap();
2104        assert!(!rpc.is_error());
2105        assert_eq!(rpc.value, Some(b"192.168.1.3".to_vec()));
2106    }
2107
2108    #[tokio::test]
2109    async fn test_available_cluster_all_available() {
2110        let inv1 = Arc::new(AvailableMockInvoker::new(
2111            make_url("192.168.1.1", "50051", "/svc"),
2112            true,
2113            true,
2114        ));
2115        let inv2 = Arc::new(AvailableMockInvoker::new(
2116            make_url("192.168.1.2", "50051", "/svc"),
2117            true,
2118            true,
2119        ));
2120        let inv3 = Arc::new(AvailableMockInvoker::new(
2121            make_url("192.168.1.3", "50051", "/svc"),
2122            true,
2123            true,
2124        ));
2125
2126        let directory = Box::new(MockDirectory::new(vec![inv1, inv2, inv3]));
2127        let cluster = AvailableCluster;
2128        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2129
2130        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2131        let result = cluster_invoker.invoke(&mut ctx).await;
2132        assert!(result.is_ok());
2133        let rpc = result.unwrap();
2134        assert!(!rpc.is_error());
2135        assert_eq!(
2136            rpc.value,
2137            Some(b"192.168.1.1".to_vec()),
2138            "should invoke on the first available invoker"
2139        );
2140    }
2141
2142    #[tokio::test]
2143    async fn test_available_cluster_none_available() {
2144        let inv1 = Arc::new(AvailableMockInvoker::new(
2145            make_url("192.168.1.1", "50051", "/svc"),
2146            false,
2147            true,
2148        ));
2149        let inv2 = Arc::new(AvailableMockInvoker::new(
2150            make_url("192.168.1.2", "50051", "/svc"),
2151            false,
2152            true,
2153        ));
2154
2155        let directory = Box::new(MockDirectory::new(vec![inv1, inv2]));
2156        let cluster = AvailableCluster;
2157        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2158
2159        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2160        let result = cluster_invoker.invoke(&mut ctx).await;
2161        assert!(result.is_err(), "no available invoker should return error");
2162    }
2163
2164    #[tokio::test]
2165    async fn test_available_cluster_empty_directory() {
2166        let directory = Box::new(MockDirectory::new(vec![]));
2167        let cluster = AvailableCluster;
2168        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2169
2170        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2171        let result = cluster_invoker.invoke(&mut ctx).await;
2172        assert!(result.is_err(), "empty directory should return error");
2173    }
2174
2175    #[tokio::test]
2176    async fn test_available_cluster_default_trait() {
2177        let cluster = AvailableCluster;
2178        let directory = Box::new(MockDirectory::new(vec![]));
2179        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2180
2181        assert!(cluster_invoker.is_available());
2182        assert_eq!(cluster_invoker.get_url().path, "/mock");
2183    }
2184
2185    // =========================================================================
2186    // MockCluster tests
2187    // =========================================================================
2188
2189    #[tokio::test]
2190    async fn test_mock_cluster_force_returns_mock_result() {
2191        let invoker = Arc::new(MockInvoker::new(
2192            make_url("192.168.1.1", "50051", "/svc"),
2193            true,
2194        ));
2195        let tracked = Arc::clone(&invoker);
2196        let directory = Box::new(MockDirectory::new(vec![invoker]));
2197        let cluster = MockCluster::new()
2198            .with_force(true)
2199            .with_mock_result(b"mock_data".to_vec());
2200        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2201
2202        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2203        let result = cluster_invoker.invoke(&mut ctx).await;
2204        assert!(result.is_ok());
2205        let rpc = result.unwrap();
2206        assert!(!rpc.is_error());
2207        assert_eq!(rpc.value, Some(b"mock_data".to_vec()));
2208        assert_eq!(
2209            tracked.call_count.load(std::sync::atomic::Ordering::SeqCst),
2210            0,
2211            "force mode should not call downstream"
2212        );
2213    }
2214
2215    #[tokio::test]
2216    async fn test_mock_cluster_force_default_empty_result() {
2217        let invoker = Arc::new(MockInvoker::new(
2218            make_url("192.168.1.1", "50051", "/svc"),
2219            true,
2220        ));
2221        let directory = Box::new(MockDirectory::new(vec![invoker]));
2222        let cluster = MockCluster::new().with_force(true);
2223        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2224
2225        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2226        let result = cluster_invoker.invoke(&mut ctx).await;
2227        assert!(result.is_ok());
2228        let rpc = result.unwrap();
2229        assert!(!rpc.is_error());
2230        assert_eq!(rpc.value, Some(vec![]));
2231    }
2232
2233    #[tokio::test]
2234    async fn test_mock_cluster_fail_on_success() {
2235        let invoker = Arc::new(MockInvoker::new(
2236            make_url("192.168.1.1", "50051", "/svc"),
2237            true,
2238        ));
2239        let directory = Box::new(MockDirectory::new(vec![invoker]));
2240        let cluster = MockCluster::new().with_mock_result(b"fallback".to_vec());
2241        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2242
2243        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2244        let result = cluster_invoker.invoke(&mut ctx).await;
2245        assert!(result.is_ok());
2246        let rpc = result.unwrap();
2247        assert!(!rpc.is_error());
2248        assert_eq!(
2249            rpc.value,
2250            Some(b"mock_response".to_vec()),
2251            "fail mode should return real result on success"
2252        );
2253    }
2254
2255    #[tokio::test]
2256    async fn test_mock_cluster_fail_on_failure() {
2257        let invoker = Arc::new(MockInvoker::new(
2258            make_url("192.168.1.1", "50051", "/svc"),
2259            false,
2260        ));
2261        let directory = Box::new(MockDirectory::new(vec![invoker]));
2262        let cluster = MockCluster::new().with_mock_result(b"fallback_data".to_vec());
2263        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2264
2265        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2266        let result = cluster_invoker.invoke(&mut ctx).await;
2267        assert!(result.is_ok());
2268        let rpc = result.unwrap();
2269        assert!(!rpc.is_error());
2270        assert_eq!(
2271            rpc.value,
2272            Some(b"fallback_data".to_vec()),
2273            "fail mode should return mock on failure"
2274        );
2275    }
2276
2277    #[tokio::test]
2278    async fn test_mock_cluster_fail_no_mock_configured() {
2279        let invoker = Arc::new(MockInvoker::new(
2280            make_url("192.168.1.1", "50051", "/svc"),
2281            false,
2282        ));
2283        let directory = Box::new(MockDirectory::new(vec![invoker]));
2284        let cluster = MockCluster::new();
2285        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2286
2287        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2288        let result = cluster_invoker.invoke(&mut ctx).await;
2289        assert!(
2290            result.is_ok(),
2291            "should return Ok with RPCResult error when no mock configured"
2292        );
2293        let rpc = result.unwrap();
2294        assert!(
2295            rpc.is_error(),
2296            "should propagate error result without mock fallback"
2297        );
2298    }
2299
2300    #[tokio::test]
2301    async fn test_mock_cluster_force_via_attachment() {
2302        let invoker = Arc::new(MockInvoker::new(
2303            make_url("192.168.1.1", "50051", "/svc"),
2304            true,
2305        ));
2306        let tracked = Arc::clone(&invoker);
2307        let directory = Box::new(MockDirectory::new(vec![invoker]));
2308        let cluster = MockCluster::new().with_mock_result(b"att_mock".to_vec());
2309        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2310
2311        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2312        ctx.attachments.insert("mock".into(), "force".into());
2313        let result = cluster_invoker.invoke(&mut ctx).await;
2314        assert!(result.is_ok());
2315        let rpc = result.unwrap();
2316        assert!(!rpc.is_error());
2317        assert_eq!(rpc.value, Some(b"att_mock".to_vec()));
2318        assert_eq!(
2319            tracked.call_count.load(std::sync::atomic::Ordering::SeqCst),
2320            0,
2321            "attachment force should not call downstream"
2322        );
2323    }
2324
2325    #[tokio::test]
2326    async fn test_mock_cluster_fail_via_attachment() {
2327        let invoker = Arc::new(MockInvoker::new(
2328            make_url("192.168.1.1", "50051", "/svc"),
2329            false,
2330        ));
2331        let directory = Box::new(MockDirectory::new(vec![invoker]));
2332        let cluster = MockCluster::new().with_mock_result(b"att_fallback".to_vec());
2333        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2334
2335        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2336        ctx.attachments.insert("mock".into(), "fail".into());
2337        let result = cluster_invoker.invoke(&mut ctx).await;
2338        assert!(result.is_ok());
2339        let rpc = result.unwrap();
2340        assert!(!rpc.is_error());
2341        assert_eq!(
2342            rpc.value,
2343            Some(b"att_fallback".to_vec()),
2344            "attachment fail should return mock on failure"
2345        );
2346    }
2347
2348    #[tokio::test]
2349    async fn test_mock_cluster_mock_result_via_attachment() {
2350        let invoker = Arc::new(MockInvoker::new(
2351            make_url("192.168.1.1", "50051", "/svc"),
2352            true,
2353        ));
2354        let tracked = Arc::clone(&invoker);
2355        let directory = Box::new(MockDirectory::new(vec![invoker]));
2356        let cluster = MockCluster::new().with_force(true);
2357        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2358
2359        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2360        ctx.attachments
2361            .insert("mock.result".into(), "from_attachment".into());
2362        let result = cluster_invoker.invoke(&mut ctx).await;
2363        assert!(result.is_ok());
2364        let rpc = result.unwrap();
2365        assert!(!rpc.is_error());
2366        assert_eq!(
2367            rpc.value,
2368            Some(b"from_attachment".to_vec()),
2369            "mock.result attachment should override mock data"
2370        );
2371        assert_eq!(
2372            tracked.call_count.load(std::sync::atomic::Ordering::SeqCst),
2373            0
2374        );
2375    }
2376
2377    #[tokio::test]
2378    async fn test_mock_cluster_empty_directory() {
2379        let directory = Box::new(MockDirectory::new(vec![]));
2380        let cluster = MockCluster::new()
2381            .with_force(true)
2382            .with_mock_result(b"mock".to_vec());
2383        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2384
2385        let mut ctx = InvocationContext::new("sayHello", URL::new("tri", "/svc"));
2386        let result = cluster_invoker.invoke(&mut ctx).await;
2387        assert!(result.is_ok());
2388        let rpc = result.unwrap();
2389        assert!(!rpc.is_error());
2390        assert_eq!(
2391            rpc.value,
2392            Some(b"mock".to_vec()),
2393            "force mode returns mock even with empty directory"
2394        );
2395    }
2396
2397    #[tokio::test]
2398    async fn test_mock_cluster_join_creates_invoker() {
2399        let invoker = Arc::new(MockInvoker::new(
2400            make_url("192.168.1.1", "50051", "/svc"),
2401            true,
2402        ));
2403        let directory = Box::new(MockDirectory::new(vec![invoker]));
2404        let cluster = MockCluster::new().with_force(true);
2405        let cluster_invoker = cluster.join(directory).await.expect("join should succeed");
2406
2407        assert!(cluster_invoker.is_available());
2408        assert_eq!(cluster_invoker.get_url().path, "/mock");
2409    }
2410
2411    #[test]
2412    fn test_mock_cluster_default() {
2413        let cluster = MockCluster::default();
2414        assert!(!cluster.force);
2415        assert!(cluster.mock_result.is_none());
2416    }
2417
2418    #[test]
2419    fn test_mock_cluster_builder() {
2420        let cluster = MockCluster::new()
2421            .with_force(true)
2422            .with_mock_result(b"test".to_vec());
2423        assert!(cluster.force);
2424        assert_eq!(cluster.mock_result, Some(b"test".to_vec()));
2425    }
2426}
2427
2428// Router tests are in a separate module to avoid name clashes
2429#[cfg(test)]
2430mod router_tests {
2431    use super::*;
2432
2433    fn make_url_with_params(host: &str, params: &[(&str, &str)]) -> URL {
2434        let mut url = URL::new("tri", "/com.example.Service");
2435        url.ip = host.to_string();
2436        for (k, v) in params {
2437            url.set_param(*k, *v);
2438        }
2439        url
2440    }
2441
2442    struct RouterTestInvoker {
2443        url: URL,
2444    }
2445
2446    impl RouterTestInvoker {
2447        fn new(url: URL) -> Self {
2448            Self { url }
2449        }
2450    }
2451
2452    impl Node for RouterTestInvoker {
2453        fn get_url(&self) -> &URL {
2454            &self.url
2455        }
2456        fn is_available(&self) -> bool {
2457            true
2458        }
2459        fn destroy(&self) {}
2460    }
2461
2462    #[async_trait::async_trait]
2463    impl Invoker for RouterTestInvoker {
2464        async fn invoke(&self, _ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
2465            Ok(RPCResult::success(b"ok".to_vec()))
2466        }
2467    }
2468
2469    fn make_invokers(params_list: &[&[(&str, &str)]]) -> Vec<Arc<dyn Invoker>> {
2470        params_list
2471            .iter()
2472            .enumerate()
2473            .map(|(i, params)| {
2474                let host = format!("192.168.1.{}", i + 1);
2475                Arc::new(RouterTestInvoker::new(make_url_with_params(&host, params)))
2476                    as Arc<dyn Invoker>
2477            })
2478            .collect()
2479    }
2480
2481    fn make_ctx() -> InvocationContext {
2482        let mut url = URL::new("tri", "/com.example.Service");
2483        url.ip = "127.0.0.1".into();
2484        InvocationContext::new("sayHello", url)
2485    }
2486
2487    // ── ConditionRouter ─────────────────────────────────────────────
2488
2489    #[test]
2490    fn test_condition_router_parse_simple() {
2491        let r = ConditionRouter::parse("env=gray").unwrap();
2492        assert!(r.match_rules.is_empty());
2493        assert_eq!(r.filter_rules.len(), 1);
2494        assert_eq!(r.filter_rules[0], ("env".to_string(), "gray".to_string()));
2495    }
2496
2497    #[test]
2498    fn test_condition_router_parse_with_match() {
2499        let r = ConditionRouter::parse("region=beijing => env=gray").unwrap();
2500        assert_eq!(r.match_rules.len(), 1);
2501        assert_eq!(
2502            r.match_rules[0],
2503            ("region".to_string(), "beijing".to_string())
2504        );
2505        assert_eq!(r.filter_rules[0], ("env".to_string(), "gray".to_string()));
2506    }
2507
2508    #[test]
2509    fn test_condition_router_always_matches_empty() {
2510        let r = ConditionRouter::parse("=> env=gray").unwrap();
2511        let ctx = make_ctx();
2512        assert!(r.matches_invocation(&ctx));
2513    }
2514
2515    #[test]
2516    fn test_condition_router_matches_with_attachment() {
2517        let r = ConditionRouter::parse("region=beijing => env=gray").unwrap();
2518        let mut ctx = make_ctx();
2519        ctx.attachments.insert("region".into(), "beijing".into());
2520        assert!(r.matches_invocation(&ctx));
2521    }
2522
2523    #[test]
2524    fn test_condition_router_no_match_wrong_value() {
2525        let r = ConditionRouter::parse("region=beijing => env=gray").unwrap();
2526        let mut ctx = make_ctx();
2527        ctx.attachments.insert("region".into(), "shanghai".into());
2528        assert!(!r.matches_invocation(&ctx));
2529    }
2530
2531    #[test]
2532    fn test_condition_router_filters_by_param() {
2533        let invokers = make_invokers(&[&[("env", "gray")], &[("env", "prod")], &[("env", "gray")]]);
2534        let r = ConditionRouter::parse("=> env=gray").unwrap();
2535        let filtered = r.filter_invokers(&invokers);
2536        assert_eq!(filtered, vec![0, 2]);
2537    }
2538
2539    // ── TagRouter ───────────────────────────────────────────────────
2540
2541    #[test]
2542    fn test_tag_router_all_when_no_tag_requested() {
2543        let invokers = make_invokers(&[&[("tag", "v1")], &[("tag", "v2")], &[]]);
2544        let ctx = make_ctx();
2545        let tr = TagRouter::default();
2546        let result = tr.route(&invokers, &ctx);
2547        assert_eq!(result, vec![0, 1, 2]);
2548    }
2549
2550    #[test]
2551    fn test_tag_router_matches_requested_tag() {
2552        let invokers = make_invokers(&[&[("tag", "v1")], &[("tag", "v2")], &[("tag", "v1")]]);
2553        let mut ctx = make_ctx();
2554        ctx.attachments.insert("dubbo.tag".into(), "v2".into());
2555        let tr = TagRouter::default();
2556        let result = tr.route(&invokers, &ctx);
2557        assert_eq!(result, vec![1]);
2558    }
2559
2560    #[test]
2561    fn test_tag_router_fallback_to_untagged() {
2562        let invokers = make_invokers(&[&[("tag", "v1")], &[]]);
2563        let mut ctx = make_ctx();
2564        ctx.attachments.insert("dubbo.tag".into(), "v2".into());
2565        let tr = TagRouter::default();
2566        let result = tr.route(&invokers, &ctx);
2567        assert_eq!(result, vec![1]);
2568    }
2569
2570    // ── RouterChain ─────────────────────────────────────────────────
2571
2572    #[test]
2573    fn test_router_chain_empty_returns_all() {
2574        let invokers = make_invokers(&[&[], &[]]);
2575        let chain = RouterChain::default();
2576        let result = chain.route(&invokers, &make_ctx());
2577        assert_eq!(result, vec![0, 1]);
2578    }
2579
2580    #[test]
2581    fn test_router_chain_condition_then_tag() {
2582        let invokers = make_invokers(&[
2583            &[("env", "gray"), ("tag", "v1")],
2584            &[("env", "gray"), ("tag", "v2")],
2585            &[("env", "prod"), ("tag", "v1")],
2586        ]);
2587        let mut ctx = make_ctx();
2588        ctx.attachments.insert("dubbo.tag".into(), "v1".into());
2589
2590        let chain = RouterChain::new()
2591            .with_condition_router(ConditionRouter::parse("=> env=gray").unwrap())
2592            .with_tag_router(TagRouter::default());
2593
2594        let result = chain.route(&invokers, &ctx);
2595        assert_eq!(result, vec![0]);
2596    }
2597
2598    #[test]
2599    fn test_condition_router_parse_empty_filter() {
2600        let r = ConditionRouter::parse("region=beijing =>").unwrap();
2601        assert_eq!(r.match_rules.len(), 1);
2602        assert_eq!(
2603            r.match_rules[0],
2604            ("region".to_string(), "beijing".to_string())
2605        );
2606        assert!(r.filter_rules.is_empty());
2607    }
2608
2609    #[test]
2610    fn test_condition_router_parse_kv_pairs_directly() {
2611        let r = ConditionRouter::parse("a=1,b=2").unwrap();
2612        assert!(r.match_rules.is_empty());
2613        assert_eq!(r.filter_rules.len(), 2);
2614        assert_eq!(r.filter_rules[0], ("a".to_string(), "1".to_string()));
2615        assert_eq!(r.filter_rules[1], ("b".to_string(), "2".to_string()));
2616    }
2617
2618    #[test]
2619    fn test_condition_router_filter_no_match() {
2620        let invokers = make_invokers(&[
2621            &[("env", "prod")],
2622            &[("env", "staging")],
2623            &[("env", "prod")],
2624        ]);
2625        let r = ConditionRouter::parse("=> zone=us-west").unwrap();
2626        let filtered = r.filter_invokers(&invokers);
2627        assert!(
2628            filtered.is_empty(),
2629            "no invoker has 'zone' param, result should be empty"
2630        );
2631    }
2632
2633    #[test]
2634    fn test_tag_router_with_custom_key() {
2635        let invokers = make_invokers(&[&[("tag", "v1")], &[("tag", "v2")], &[]]);
2636        let mut ctx = make_ctx();
2637        ctx.attachments.insert("custom-key".into(), "v1".into());
2638
2639        let tr = TagRouter::new().with_tag_key("custom-key");
2640        let result = tr.route(&invokers, &ctx);
2641        assert_eq!(result, vec![0]);
2642    }
2643
2644    #[test]
2645    fn test_router_chain_empty_after_condition() {
2646        let invokers = make_invokers(&[&[("env", "prod")], &[("env", "prod")]]);
2647        let chain = RouterChain::new()
2648            .with_condition_router(ConditionRouter::parse("=> env=gray").unwrap());
2649        let result = chain.route(&invokers, &make_ctx());
2650        assert!(
2651            result.is_empty(),
2652            "condition router removes all invokers, chain should return empty"
2653        );
2654    }
2655}
2656
2657#[cfg(test)]
2658mod script_router_tests {
2659    use super::*;
2660
2661    fn make_url_with_params(host: &str, params: &[(&str, &str)]) -> URL {
2662        let mut url = URL::new("tri", "/com.example.Service");
2663        url.ip = host.to_string();
2664        for (k, v) in params {
2665            url.set_param(*k, *v);
2666        }
2667        url
2668    }
2669
2670    struct TestInvoker {
2671        url: URL,
2672    }
2673
2674    impl TestInvoker {
2675        fn new(url: URL) -> Self {
2676            Self { url }
2677        }
2678    }
2679
2680    impl Node for TestInvoker {
2681        fn get_url(&self) -> &URL {
2682            &self.url
2683        }
2684        fn is_available(&self) -> bool {
2685            true
2686        }
2687        fn destroy(&self) {}
2688    }
2689
2690    #[async_trait::async_trait]
2691    impl Invoker for TestInvoker {
2692        async fn invoke(&self, _ctx: &mut InvocationContext) -> Result<RPCResult, anyhow::Error> {
2693            Ok(RPCResult::success(b"ok".to_vec()))
2694        }
2695    }
2696
2697    fn make_invokers(params_list: &[&[(&str, &str)]]) -> Vec<Arc<dyn Invoker>> {
2698        params_list
2699            .iter()
2700            .enumerate()
2701            .map(|(i, params)| {
2702                let host = format!("10.0.0.{}", i + 1);
2703                Arc::new(TestInvoker::new(make_url_with_params(&host, params))) as Arc<dyn Invoker>
2704            })
2705            .collect()
2706    }
2707
2708    fn make_ctx() -> InvocationContext {
2709        let mut url = URL::new("tri", "/com.example.Service");
2710        url.ip = "127.0.0.1".into();
2711        InvocationContext::new("sayHello", url)
2712    }
2713
2714    #[test]
2715    fn test_script_router_basic_filter() {
2716        let router = ScriptRouter::new("[0, 2]").unwrap();
2717        let invokers = make_invokers(&[&[], &[], &[]]);
2718        let ctx = make_ctx();
2719        let result = router.route(&invokers, &ctx);
2720        assert_eq!(result, vec![0, 2]);
2721    }
2722
2723    #[test]
2724    fn test_script_router_select_by_ip() {
2725        let script = r#"
2726            let result = [];
2727            for i in 0..invoker_count() {
2728                if invoker_ip(i).starts_with("10.0.0.1") {
2729                    result.push(i);
2730                }
2731            }
2732            result
2733        "#;
2734        let router = ScriptRouter::new(script).unwrap();
2735        let invokers = make_invokers(&[&[], &[], &[]]);
2736        let ctx = make_ctx();
2737        let result = router.route(&invokers, &ctx);
2738        assert_eq!(result, vec![0]);
2739    }
2740
2741    #[test]
2742    fn test_script_router_select_by_param() {
2743        let script = r#"
2744            let result = [];
2745            for i in 0..invoker_count() {
2746                if invoker_has_param(i, "env") && invoker_get_param(i, "env") == "gray" {
2747                    result.push(i);
2748                }
2749            }
2750            result
2751        "#;
2752        let router = ScriptRouter::new(script).unwrap();
2753        let invokers = make_invokers(&[&[("env", "gray")], &[("env", "prod")], &[("env", "gray")]]);
2754        let ctx = make_ctx();
2755        let result = router.route(&invokers, &ctx);
2756        assert_eq!(result, vec![0, 2]);
2757    }
2758
2759    #[test]
2760    fn test_script_router_method_name() {
2761        let script = r#"
2762            if method_name() == "sayHello" {
2763                [0, 1]
2764            } else {
2765                []
2766            }
2767        "#;
2768        let router = ScriptRouter::new(script).unwrap();
2769        let invokers = make_invokers(&[&[], &[]]);
2770        let ctx = make_ctx();
2771        let result = router.route(&invokers, &ctx);
2772        assert_eq!(result, vec![0, 1]);
2773
2774        let mut url = URL::new("tri", "/com.example.Service");
2775        url.ip = "127.0.0.1".into();
2776        let ctx2 = InvocationContext::new("byebye", url);
2777        let result2 = router.route(&invokers, &ctx2);
2778        assert!(result2.is_empty());
2779    }
2780
2781    #[test]
2782    fn test_script_router_attachment_based() {
2783        let script = r#"
2784            if has_attachment("region") && get_attachment("region") == "beijing" {
2785                [1]
2786            } else {
2787                [0]
2788            }
2789        "#;
2790        let router = ScriptRouter::new(script).unwrap();
2791        let invokers = make_invokers(&[&[], &[]]);
2792
2793        let mut ctx = make_ctx();
2794        ctx.attachments.insert("region".into(), "beijing".into());
2795        let result = router.route(&invokers, &ctx);
2796        assert_eq!(result, vec![1]);
2797
2798        let ctx_no_att = make_ctx();
2799        let result2 = router.route(&invokers, &ctx_no_att);
2800        assert_eq!(result2, vec![0]);
2801    }
2802
2803    #[test]
2804    fn test_script_router_invalid_script() {
2805        let result = ScriptRouter::new("fn (");
2806        assert!(result.is_err(), "bad script should fail to compile");
2807    }
2808
2809    #[test]
2810    fn test_script_router_out_of_bounds() {
2811        let router = ScriptRouter::new("[99, 200]").unwrap();
2812        let invokers = make_invokers(&[&[], &[]]);
2813        let ctx = make_ctx();
2814        let result = router.route(&invokers, &ctx);
2815        assert!(
2816            result.is_empty(),
2817            "out-of-bounds indices should produce empty result"
2818        );
2819    }
2820
2821    #[test]
2822    fn test_script_router_chain_integration() {
2823        let script = r#"
2824            let result = [];
2825            for i in 0..invoker_count() {
2826                if invoker_has_param(i, "env") && invoker_get_param(i, "env") == "gray" {
2827                    result.push(i);
2828                }
2829            }
2830            result
2831        "#;
2832        let sr = ScriptRouter::new(script).unwrap();
2833        let invokers = make_invokers(&[
2834            &[("env", "gray"), ("tag", "v1")],
2835            &[("env", "prod")],
2836            &[("env", "gray"), ("tag", "v2")],
2837        ]);
2838        let mut ctx = make_ctx();
2839        ctx.attachments.insert("dubbo.tag".into(), "v1".into());
2840
2841        let chain = RouterChain::new()
2842            .with_tag_router(TagRouter::default())
2843            .with_script_router(sr);
2844        let result = chain.route(&invokers, &ctx);
2845        assert_eq!(result, vec![0]);
2846    }
2847}