mssf_util/
resolve.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{pin::Pin, time::Duration};
7
8use mssf_core::runtime::executor::{BoxedCancelToken, Timer};
9use mssf_core::{ErrorCode, WString};
10
11use mssf_core::client::{
12    FabricClient,
13    svc_mgmt_client::{PartitionKeyType, ResolvedServicePartition, ServiceManagementClient},
14};
15
16/// The same as dotnet sdk:
17/// https://github.com/microsoft/service-fabric-services-and-actors-dotnet/blob/develop/src/Microsoft.ServiceFabric.Services/Client/ServicePartitionResolver.cs
18/// But this does not register notification on resolve success.
19/// User needs to register notification manually on the FabricClient before creating this resolver.
20pub struct ServicePartitionResolver {
21    sm: ServiceManagementClient,
22    timer: Box<dyn Timer>,
23    default_timeout: Duration,
24    max_retry_interval: Duration,
25}
26
27/// TimeCounter is used to track elapsed time and remaining time for operations.
28struct TimeCounter {
29    timeout: Duration,
30    start: std::time::Instant,
31}
32
33impl TimeCounter {
34    pub fn new(timeout: Duration) -> Self {
35        TimeCounter {
36            timeout,
37            start: std::time::Instant::now(),
38        }
39    }
40
41    pub fn elapsed(&self) -> Duration {
42        self.start.elapsed()
43    }
44
45    pub fn remaining(&self) -> mssf_core::Result<Duration> {
46        if self.elapsed() < self.timeout {
47            Ok(self.timeout - self.elapsed())
48        } else {
49            Err(ErrorCode::FABRIC_E_TIMEOUT.into())
50        }
51    }
52
53    /// returns a future that will sleep until the remaining time is up.
54    pub fn sleep_until_remaining(
55        &self,
56        timer: &dyn Timer,
57    ) -> mssf_core::Result<impl Future<Output = ()>> {
58        let remaining = self.remaining()?;
59        Ok(timer.sleep(remaining))
60    }
61}
62
63pub struct ServicePartitionResolverBuilder {
64    fc: FabricClient,
65    timer: Option<Box<dyn Timer>>,
66    default_timeout: Option<Duration>,
67    default_max_retry_interval: Option<Duration>,
68}
69
70impl ServicePartitionResolverBuilder {
71    pub fn new(fc: FabricClient) -> Self {
72        ServicePartitionResolverBuilder {
73            fc,
74            timer: None,
75            default_timeout: None,
76            default_max_retry_interval: None,
77        }
78    }
79
80    /// With a runtime timer to use for sleeping.
81    pub fn with_timer(mut self, timer: Box<dyn Timer>) -> Self {
82        self.timer = Some(timer);
83        self
84    }
85
86    pub fn build(self) -> ServicePartitionResolver {
87        ServicePartitionResolver {
88            sm: self.fc.get_service_manager().clone(),
89            timer: self.timer.unwrap_or(Box::new(crate::tokio::TokioTimer)),
90            default_timeout: self.default_timeout.unwrap_or(Duration::from_secs(30)),
91            max_retry_interval: self
92                .default_max_retry_interval
93                .unwrap_or(Duration::from_secs(5)),
94        }
95    }
96}
97
98impl ServicePartitionResolver {
99    pub fn builder(fc: FabricClient) -> ServicePartitionResolverBuilder {
100        ServicePartitionResolverBuilder::new(fc)
101    }
102
103    /// Resolve the service partition by name and key type.
104    /// It retries all transient errors and timeouts.
105    #[cfg_attr(
106        feature = "tracing",
107        tracing::instrument(skip_all, fields(uri = %name, timeout = ?timeout.unwrap_or(self.default_timeout)), err)
108    )]
109    pub async fn resolve(
110        &self,
111        name: &WString,
112        key_type: &PartitionKeyType,
113        prev: Option<&ResolvedServicePartition>,
114        timeout: Option<Duration>, // Total timeout for the operation
115        token: Option<BoxedCancelToken>,
116    ) -> mssf_core::Result<ResolvedServicePartition> {
117        let timeout = timeout.unwrap_or(self.default_timeout);
118        let timer = TimeCounter::new(timeout);
119        let mut cancel: Pin<Box<dyn std::future::Future<Output = ()> + Send>> =
120            if let Some(t) = &token {
121                t.wait()
122            } else {
123                Box::pin(std::future::pending())
124            };
125        loop {
126            let rsp_res = tokio::select! {
127                _ = timer.sleep_until_remaining(self.timer.as_ref())? => {
128                    // Timeout reached, return error.
129                    return Err(ErrorCode::FABRIC_E_TIMEOUT.into());
130                }
131                _ = &mut cancel => {
132                    // Cancellation requested, return error.
133                    return Err(ErrorCode::E_ABORT.into());
134                }
135                rsp_opt = self
136                    .sm
137                    .resolve_service_partition(name, key_type, prev, timer.remaining()?, token.clone()) => rsp_opt,
138            };
139            let rsp_opt = match rsp_res {
140                Ok(partition) => Some(partition),
141                Err(e) => match e.try_as_fabric_error_code() {
142                    Ok(ec) => {
143                        if ec == ErrorCode::FABRIC_E_TIMEOUT || ec.is_transient() {
144                            #[cfg(feature = "tracing")]
145                            tracing::debug!(
146                                "Service partition transient error {ec}. Remaining time {:?}. Retrying...",
147                                timer.remaining()?
148                            );
149                            // do nothing, retry.
150                            None
151                        } else {
152                            return Err(e);
153                        }
154                    }
155                    _ => return Err(e),
156                },
157            };
158
159            // Check rsp is valid and save in the cache.
160            // Sometimes endpoint is empty (may due to service removed), so we need to retry.
161            if let Some(rsp) = rsp_opt
162                && rsp.get_endpoint_list().iter().count() > 0
163            {
164                return Ok(rsp);
165            }
166            // sleep for a while before retrying.
167            tokio::select! {
168                _ = self.timer.sleep(self.max_retry_interval) => {},
169                _ = timer.sleep_until_remaining(self.timer.as_ref())? => {
170                    // Timeout reached, return error.
171                    return Err(ErrorCode::FABRIC_E_TIMEOUT.into());
172                }
173                _ = &mut cancel => {
174                    // Cancellation requested, return error.
175                    return Err(ErrorCode::E_ABORT.into());
176                }
177            }
178        }
179    }
180}