Skip to main content

mssf_core/client/
query_client.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5use std::time::Duration;
6
7use mssf_com::{
8    FabricClient::{
9        IFabricGetApplicationListResult2, IFabricGetDeployedServiceReplicaDetailResult,
10        IFabricGetNodeListResult2, IFabricGetPartitionListResult2,
11        IFabricGetPartitionLoadInformationResult, IFabricGetReplicaListResult2,
12        IFabricQueryClient13,
13    },
14    FabricTypes::{
15        FABRIC_APPLICATION_QUERY_DESCRIPTION,
16        FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION, FABRIC_NODE_QUERY_DESCRIPTION,
17        FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION,
18        FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION, FABRIC_SERVICE_QUERY_DESCRIPTION,
19        FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION,
20    },
21};
22
23use crate::mem::{BoxPool, GetRawWithBoxPool};
24
25use crate::types::{
26    DeployedServiceReplicaDetailQueryDescription, DeployedServiceReplicaDetailQueryResult,
27    GetPartitionLoadInformationResult, NodeListResult, NodeQueryDescription,
28    PartitionLoadInformationQueryDescription, ServicePartitionList,
29    ServicePartitionQueryDescription, ServiceReplicaList, ServiceReplicaQueryDescription,
30};
31use crate::{
32    runtime::executor::BoxedCancelToken,
33    sync::{FabricReceiver, fabric_begin_end_proxy},
34    types::ServiceQueryDescription,
35};
36
37#[derive(Debug, Clone)]
38pub struct QueryClient {
39    com: IFabricQueryClient13,
40}
41
42// Internal implementation block
43// Internal functions focuses on changing SF callback to async future,
44// while the public apis impl focuses on type conversion.
45
46impl QueryClient {
47    pub fn get_node_list_internal(
48        &self,
49        query_description: &FABRIC_NODE_QUERY_DESCRIPTION,
50        timeout_milliseconds: u32,
51        cancellation_token: Option<BoxedCancelToken>,
52    ) -> FabricReceiver<crate::Result<IFabricGetNodeListResult2>> {
53        let com1 = &self.com;
54        let com2 = self.com.clone();
55
56        fabric_begin_end_proxy(
57            move |callback| unsafe {
58                com1.BeginGetNodeList(query_description, timeout_milliseconds, callback)
59            },
60            move |ctx| unsafe { com2.EndGetNodeList2(ctx) },
61            cancellation_token,
62        )
63    }
64
65    pub fn get_application_list_internal(
66        &self,
67        query_description: &FABRIC_APPLICATION_QUERY_DESCRIPTION,
68        timeout_milliseconds: u32,
69        cancellation_token: Option<BoxedCancelToken>,
70    ) -> FabricReceiver<crate::Result<IFabricGetApplicationListResult2>> {
71        let com1 = &self.com;
72        let com2 = self.com.clone();
73        fabric_begin_end_proxy(
74            move |callback| unsafe {
75                com1.BeginGetApplicationList(query_description, timeout_milliseconds, callback)
76            },
77            move |ctx| unsafe { com2.EndGetApplicationList2(ctx) },
78            cancellation_token,
79        )
80    }
81
82    fn get_service_list_internal(
83        &self,
84        desc: &FABRIC_SERVICE_QUERY_DESCRIPTION,
85        timeout_milliseconds: u32,
86        cancellation_token: Option<BoxedCancelToken>,
87    ) -> FabricReceiver<crate::Result<mssf_com::FabricClient::IFabricGetServiceListResult2>> {
88        let com1 = &self.com;
89        let com2 = self.com.clone();
90        fabric_begin_end_proxy(
91            move |callback| unsafe {
92                com1.BeginGetServiceList(desc, timeout_milliseconds, callback)
93            },
94            move |ctx| unsafe { com2.EndGetServiceList2(ctx) },
95            cancellation_token,
96        )
97    }
98
99    fn get_partition_list_internal(
100        &self,
101        desc: &FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION,
102        timeout_milliseconds: u32,
103        cancellation_token: Option<BoxedCancelToken>,
104    ) -> FabricReceiver<crate::Result<IFabricGetPartitionListResult2>> {
105        let com1 = &self.com;
106        let com2 = self.com.clone();
107        fabric_begin_end_proxy(
108            move |callback| unsafe {
109                com1.BeginGetPartitionList(desc, timeout_milliseconds, callback)
110            },
111            move |ctx| unsafe { com2.EndGetPartitionList2(ctx) },
112            cancellation_token,
113        )
114    }
115
116    fn get_replica_list_internal(
117        &self,
118        desc: &FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION,
119        timeout_milliseconds: u32,
120        cancellation_token: Option<BoxedCancelToken>,
121    ) -> FabricReceiver<crate::Result<IFabricGetReplicaListResult2>> {
122        let com1 = &self.com;
123        let com2 = self.com.clone();
124        fabric_begin_end_proxy(
125            move |callback| unsafe {
126                com1.BeginGetReplicaList(desc, timeout_milliseconds, callback)
127            },
128            move |ctx| unsafe { com2.EndGetReplicaList2(ctx) },
129            cancellation_token,
130        )
131    }
132
133    fn get_partition_load_information_internal(
134        &self,
135        desc: &FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION,
136        timeout_milliseconds: u32,
137        cancellation_token: Option<BoxedCancelToken>,
138    ) -> FabricReceiver<crate::Result<IFabricGetPartitionLoadInformationResult>> {
139        let com1 = &self.com;
140        let com2 = self.com.clone();
141        fabric_begin_end_proxy(
142            move |callback| unsafe {
143                com1.BeginGetPartitionLoadInformation(desc, timeout_milliseconds, callback)
144            },
145            move |ctx| unsafe { com2.EndGetPartitionLoadInformation(ctx) },
146            cancellation_token,
147        )
148    }
149
150    fn get_deployed_replica_detail_internal(
151        &self,
152        desc: &FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION,
153        timeout_milliseconds: u32,
154        cancellation_token: Option<BoxedCancelToken>,
155    ) -> FabricReceiver<crate::Result<IFabricGetDeployedServiceReplicaDetailResult>> {
156        let com1 = &self.com;
157        let com2 = self.com.clone();
158        fabric_begin_end_proxy(
159            move |callback| unsafe {
160                com1.BeginGetDeployedReplicaDetail(desc, timeout_milliseconds, callback)
161            },
162            move |ctx| unsafe { com2.EndGetDeployedReplicaDetail(ctx) },
163            cancellation_token,
164        )
165    }
166}
167
168impl From<IFabricQueryClient13> for QueryClient {
169    fn from(com: IFabricQueryClient13) -> Self {
170        Self { com }
171    }
172}
173
174impl From<QueryClient> for IFabricQueryClient13 {
175    fn from(value: QueryClient) -> Self {
176        value.com
177    }
178}
179
180impl QueryClient {
181    // List nodes in the cluster
182    pub async fn get_node_list(
183        &self,
184        desc: &NodeQueryDescription,
185        timeout: Duration,
186        cancellation_token: Option<BoxedCancelToken>,
187    ) -> crate::Result<NodeListResult> {
188        let com = {
189            let mut pool = BoxPool::new();
190            let arg = desc.get_raw_with_pool(&mut pool);
191            self.get_node_list_internal(
192                &arg,
193                timeout.as_millis().try_into().unwrap(),
194                cancellation_token,
195            )
196        }
197        .await??;
198        Ok(NodeListResult::from(&com))
199    }
200
201    pub async fn get_application_list(
202        &self,
203        desc: &crate::types::ApplicationQueryDescription,
204        timeout: Duration,
205        cancellation_token: Option<BoxedCancelToken>,
206    ) -> crate::Result<crate::types::ApplicationListResult> {
207        let com = {
208            let mut pool = BoxPool::new();
209            let arg = desc.get_raw_with_pool(&mut pool);
210            self.get_application_list_internal(
211                &arg,
212                timeout.as_millis().try_into().unwrap(),
213                cancellation_token,
214            )
215        }
216        .await??;
217        Ok(crate::types::ApplicationListResult::from(&com))
218    }
219    pub async fn get_service_list(
220        &self,
221        desc: &ServiceQueryDescription,
222        timeout: Duration,
223        cancellation_token: Option<BoxedCancelToken>,
224    ) -> crate::Result<crate::types::ServiceListResult> {
225        let com = {
226            let mut pool = BoxPool::new();
227            let arg = desc.get_raw_with_pool(&mut pool);
228            self.get_service_list_internal(&arg, timeout.as_millis() as u32, cancellation_token)
229        }
230        .await??;
231        Ok(crate::types::ServiceListResult::from(&com))
232    }
233
234    pub async fn get_partition_list(
235        &self,
236        desc: &ServicePartitionQueryDescription,
237        timeout: Duration,
238        cancellation_token: Option<BoxedCancelToken>,
239    ) -> crate::Result<ServicePartitionList> {
240        let com = {
241            let raw: FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION = desc.into();
242            let mili = timeout.as_millis() as u32;
243            self.get_partition_list_internal(&raw, mili, cancellation_token)
244        }
245        .await??;
246        Ok(ServicePartitionList::from(&com))
247    }
248
249    pub async fn get_replica_list(
250        &self,
251        desc: &ServiceReplicaQueryDescription,
252        timeout: Duration,
253        cancellation_token: Option<BoxedCancelToken>,
254    ) -> crate::Result<ServiceReplicaList> {
255        let com = {
256            let raw: FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION = desc.into();
257            let mili = timeout.as_millis() as u32;
258            self.get_replica_list_internal(&raw, mili, cancellation_token)
259        }
260        .await??;
261        Ok(ServiceReplicaList::from(&com))
262    }
263
264    pub async fn get_partition_load_information(
265        &self,
266        desc: &PartitionLoadInformationQueryDescription,
267        timeout: Duration,
268        cancellation_token: Option<BoxedCancelToken>,
269    ) -> crate::Result<GetPartitionLoadInformationResult> {
270        let com = {
271            let raw: FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION = desc.into();
272            let timeout_ms = timeout.as_micros() as u32;
273            self.get_partition_load_information_internal(&raw, timeout_ms, cancellation_token)
274        }
275        .await??;
276        Ok(GetPartitionLoadInformationResult::from(&com))
277    }
278
279    pub async fn get_deployed_replica_detail(
280        &self,
281        desc: &DeployedServiceReplicaDetailQueryDescription,
282        timeout: Duration,
283        cancellation_token: Option<BoxedCancelToken>,
284    ) -> crate::Result<DeployedServiceReplicaDetailQueryResult> {
285        let com = {
286            let raw: FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION = desc.into();
287            let timeout_ms = timeout.as_micros() as u32;
288            self.get_deployed_replica_detail_internal(&raw, timeout_ms, cancellation_token)
289        }
290        .await??;
291        Ok(DeployedServiceReplicaDetailQueryResult::new(com))
292    }
293}