mssf_util/
resolve.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use crate::retry::OperationRetryer;
7use mssf_core::{
8    ErrorCode,
9    client::{
10        FabricClient,
11        svc_mgmt_client::{PartitionKeyType, ResolvedServicePartition, ServiceManagementClient},
12    },
13    runtime::executor::BoxedCancelToken,
14    types::Uri,
15};
16use std::time::Duration;
17
18/// The same as dotnet sdk:
19/// https://github.com/microsoft/service-fabric-services-and-actors-dotnet/blob/develop/src/Microsoft.ServiceFabric.Services/Client/ServicePartitionResolver.cs
20/// But this does not register notification on resolve success.
21/// User needs to register notification manually on the FabricClient before creating this resolver.
22pub struct ServicePartitionResolver {
23    sm: ServiceManagementClient,
24    retryer: OperationRetryer,
25}
26
27impl ServicePartitionResolver {
28    pub fn new(fc: FabricClient, retryer: OperationRetryer) -> Self {
29        ServicePartitionResolver {
30            sm: fc.get_service_manager().clone(),
31            retryer,
32        }
33    }
34
35    /// Resolve the service partition by name and key type.
36    /// It retries all transient errors and timeouts.
37    #[cfg_attr(
38        feature = "tracing",
39        tracing::instrument(skip_all, fields(uri = %name, timeout = ?timeout), err)
40    )]
41    pub async fn resolve(
42        &self,
43        name: &Uri,
44        key_type: &PartitionKeyType,
45        prev: Option<&ResolvedServicePartition>,
46        timeout: Option<Duration>, // Total timeout for the operation
47        token: Option<BoxedCancelToken>,
48    ) -> mssf_core::Result<ResolvedServicePartition> {
49        // tracing span is auto propagated in async context
50        self.retryer
51            .run(
52                async |t, tk| {
53                    let rsp = self
54                        .sm
55                        .resolve_service_partition(name, key_type, prev, t, tk)
56                        .await?;
57
58                    // Check rsp is valid and save in the cache.
59                    // Sometimes endpoint is empty (may due to service removed), so we need to retry.
60                    if rsp.get_endpoint_list().iter().count() > 0 {
61                        Ok(rsp)
62                    } else {
63                        Err(ErrorCode::FABRIC_E_SERVICE_OFFLINE.into())
64                    }
65                },
66                timeout,
67                token,
68            )
69            .await
70    }
71}