1use std::{pin::Pin, time::Duration};
7
8use mssf_core::runtime::executor::{BoxedCancelToken, Timer};
9use mssf_core::{ErrorCode, WString};
10
11use mssf_core::client::{
12 FabricClient,
13 svc_mgmt_client::{PartitionKeyType, ResolvedServicePartition, ServiceManagementClient},
14};
15
16pub struct ServicePartitionResolver {
21 sm: ServiceManagementClient,
22 timer: Box<dyn Timer>,
23 default_timeout: Duration,
24 max_retry_interval: Duration,
25}
26
27struct TimeCounter {
29 timeout: Duration,
30 start: std::time::Instant,
31}
32
33impl TimeCounter {
34 pub fn new(timeout: Duration) -> Self {
35 TimeCounter {
36 timeout,
37 start: std::time::Instant::now(),
38 }
39 }
40
41 pub fn elapsed(&self) -> Duration {
42 self.start.elapsed()
43 }
44
45 pub fn remaining(&self) -> mssf_core::Result<Duration> {
46 if self.elapsed() < self.timeout {
47 Ok(self.timeout - self.elapsed())
48 } else {
49 Err(ErrorCode::FABRIC_E_TIMEOUT.into())
50 }
51 }
52
53 pub fn sleep_until_remaining(
55 &self,
56 timer: &dyn Timer,
57 ) -> mssf_core::Result<impl Future<Output = ()>> {
58 let remaining = self.remaining()?;
59 Ok(timer.sleep(remaining))
60 }
61}
62
63pub struct ServicePartitionResolverBuilder {
64 fc: FabricClient,
65 timer: Option<Box<dyn Timer>>,
66 default_timeout: Option<Duration>,
67 default_max_retry_interval: Option<Duration>,
68}
69
70impl ServicePartitionResolverBuilder {
71 pub fn new(fc: FabricClient) -> Self {
72 ServicePartitionResolverBuilder {
73 fc,
74 timer: None,
75 default_timeout: None,
76 default_max_retry_interval: None,
77 }
78 }
79
80 pub fn with_timer(mut self, timer: Box<dyn Timer>) -> Self {
82 self.timer = Some(timer);
83 self
84 }
85
86 pub fn build(self) -> ServicePartitionResolver {
87 ServicePartitionResolver {
88 sm: self.fc.get_service_manager().clone(),
89 timer: self.timer.unwrap_or(Box::new(crate::tokio::TokioTimer)),
90 default_timeout: self.default_timeout.unwrap_or(Duration::from_secs(30)),
91 max_retry_interval: self
92 .default_max_retry_interval
93 .unwrap_or(Duration::from_secs(5)),
94 }
95 }
96}
97
98impl ServicePartitionResolver {
99 pub fn builder(fc: FabricClient) -> ServicePartitionResolverBuilder {
100 ServicePartitionResolverBuilder::new(fc)
101 }
102
103 #[cfg_attr(
106 feature = "tracing",
107 tracing::instrument(skip_all, fields(uri = %name, timeout = ?timeout.unwrap_or(self.default_timeout)), err)
108 )]
109 pub async fn resolve(
110 &self,
111 name: &WString,
112 key_type: &PartitionKeyType,
113 prev: Option<&ResolvedServicePartition>,
114 timeout: Option<Duration>, token: Option<BoxedCancelToken>,
116 ) -> mssf_core::Result<ResolvedServicePartition> {
117 let timeout = timeout.unwrap_or(self.default_timeout);
118 let timer = TimeCounter::new(timeout);
119 let mut cancel: Pin<Box<dyn std::future::Future<Output = ()> + Send>> =
120 if let Some(t) = &token {
121 t.wait()
122 } else {
123 Box::pin(std::future::pending())
124 };
125 loop {
126 let rsp_res = tokio::select! {
127 _ = timer.sleep_until_remaining(self.timer.as_ref())? => {
128 return Err(ErrorCode::FABRIC_E_TIMEOUT.into());
130 }
131 _ = &mut cancel => {
132 return Err(ErrorCode::E_ABORT.into());
134 }
135 rsp_opt = self
136 .sm
137 .resolve_service_partition(name, key_type, prev, timer.remaining()?, token.clone()) => rsp_opt,
138 };
139 let rsp_opt = match rsp_res {
140 Ok(partition) => Some(partition),
141 Err(e) => match e.try_as_fabric_error_code() {
142 Ok(ec) => {
143 if ec == ErrorCode::FABRIC_E_TIMEOUT || ec.is_transient() {
144 #[cfg(feature = "tracing")]
145 tracing::debug!(
146 "Service partition transient error {ec}. Remaining time {:?}. Retrying...",
147 timer.remaining()?
148 );
149 None
151 } else {
152 return Err(e);
153 }
154 }
155 _ => return Err(e),
156 },
157 };
158
159 if let Some(rsp) = rsp_opt
162 && rsp.get_endpoint_list().iter().count() > 0
163 {
164 return Ok(rsp);
165 }
166 tokio::select! {
168 _ = self.timer.sleep(self.max_retry_interval) => {},
169 _ = timer.sleep_until_remaining(self.timer.as_ref())? => {
170 return Err(ErrorCode::FABRIC_E_TIMEOUT.into());
172 }
173 _ = &mut cancel => {
174 return Err(ErrorCode::E_ABORT.into());
176 }
177 }
178 }
179 }
180}