mssf_core/client/
query_client.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5use std::{ffi::c_void, time::Duration};
6
7use mssf_com::{
8    FabricClient::{
9        IFabricGetApplicationListResult2, IFabricGetDeployedServiceReplicaDetailResult,
10        IFabricGetNodeListResult2, IFabricGetPartitionListResult2,
11        IFabricGetPartitionLoadInformationResult, IFabricGetReplicaListResult2,
12        IFabricQueryClient10,
13    },
14    FabricTypes::{
15        FABRIC_APPLICATION_QUERY_DESCRIPTION,
16        FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION, FABRIC_NODE_QUERY_DESCRIPTION,
17        FABRIC_NODE_QUERY_DESCRIPTION_EX1, FABRIC_NODE_QUERY_DESCRIPTION_EX2,
18        FABRIC_NODE_QUERY_DESCRIPTION_EX3, FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION,
19        FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION, FABRIC_SERVICE_QUERY_DESCRIPTION,
20        FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION,
21    },
22};
23
24use crate::{
25    runtime::executor::BoxedCancelToken,
26    sync::{FabricReceiver, fabric_begin_end_proxy},
27    types::ServiceQueryDescription,
28};
29use crate::{
30    strings::get_pcwstr_from_opt,
31    types::{
32        DeployedServiceReplicaDetailQueryDescription, DeployedServiceReplicaDetailQueryResult,
33        NodeListResult, NodeQueryDescription, PartitionLoadInformation,
34        PartitionLoadInformationQueryDescription, ServicePartitionList,
35        ServicePartitionQueryDescription, ServiceReplicaList, ServiceReplicaQueryDescription,
36    },
37};
38
39#[derive(Debug, Clone)]
40pub struct QueryClient {
41    com: IFabricQueryClient10,
42}
43
44// Internal implementation block
45// Internal functions focuses on changing SF callback to async future,
46// while the public apis impl focuses on type conversion.
47
48impl QueryClient {
49    pub fn get_node_list_internal(
50        &self,
51        query_description: &FABRIC_NODE_QUERY_DESCRIPTION,
52        timeout_milliseconds: u32,
53        cancellation_token: Option<BoxedCancelToken>,
54    ) -> FabricReceiver<crate::WinResult<IFabricGetNodeListResult2>> {
55        let com1 = &self.com;
56        let com2 = self.com.clone();
57
58        fabric_begin_end_proxy(
59            move |callback| unsafe {
60                com1.BeginGetNodeList(query_description, timeout_milliseconds, callback)
61            },
62            move |ctx| unsafe { com2.EndGetNodeList2(ctx) },
63            cancellation_token,
64        )
65    }
66
67    pub fn get_application_list_internal(
68        &self,
69        query_description: &FABRIC_APPLICATION_QUERY_DESCRIPTION,
70        timeout_milliseconds: u32,
71        cancellation_token: Option<BoxedCancelToken>,
72    ) -> FabricReceiver<crate::WinResult<IFabricGetApplicationListResult2>> {
73        let com1 = &self.com;
74        let com2 = self.com.clone();
75        fabric_begin_end_proxy(
76            move |callback| unsafe {
77                com1.BeginGetApplicationList(query_description, timeout_milliseconds, callback)
78            },
79            move |ctx| unsafe { com2.EndGetApplicationList2(ctx) },
80            cancellation_token,
81        )
82    }
83
84    fn get_service_list_internal(
85        &self,
86        desc: &FABRIC_SERVICE_QUERY_DESCRIPTION,
87        timeout_milliseconds: u32,
88        cancellation_token: Option<BoxedCancelToken>,
89    ) -> FabricReceiver<crate::WinResult<mssf_com::FabricClient::IFabricGetServiceListResult2>>
90    {
91        let com1 = &self.com;
92        let com2 = self.com.clone();
93        fabric_begin_end_proxy(
94            move |callback| unsafe {
95                com1.BeginGetServiceList(desc, timeout_milliseconds, callback)
96            },
97            move |ctx| unsafe { com2.EndGetServiceList2(ctx) },
98            cancellation_token,
99        )
100    }
101
102    fn get_partition_list_internal(
103        &self,
104        desc: &FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION,
105        timeout_milliseconds: u32,
106        cancellation_token: Option<BoxedCancelToken>,
107    ) -> FabricReceiver<crate::WinResult<IFabricGetPartitionListResult2>> {
108        let com1 = &self.com;
109        let com2 = self.com.clone();
110        fabric_begin_end_proxy(
111            move |callback| unsafe {
112                com1.BeginGetPartitionList(desc, timeout_milliseconds, callback)
113            },
114            move |ctx| unsafe { com2.EndGetPartitionList2(ctx) },
115            cancellation_token,
116        )
117    }
118
119    fn get_replica_list_internal(
120        &self,
121        desc: &FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION,
122        timeout_milliseconds: u32,
123        cancellation_token: Option<BoxedCancelToken>,
124    ) -> FabricReceiver<crate::WinResult<IFabricGetReplicaListResult2>> {
125        let com1 = &self.com;
126        let com2 = self.com.clone();
127        fabric_begin_end_proxy(
128            move |callback| unsafe {
129                com1.BeginGetReplicaList(desc, timeout_milliseconds, callback)
130            },
131            move |ctx| unsafe { com2.EndGetReplicaList2(ctx) },
132            cancellation_token,
133        )
134    }
135
136    fn get_partition_load_information_internal(
137        &self,
138        desc: &FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION,
139        timeout_milliseconds: u32,
140        cancellation_token: Option<BoxedCancelToken>,
141    ) -> FabricReceiver<crate::WinResult<IFabricGetPartitionLoadInformationResult>> {
142        let com1 = &self.com;
143        let com2 = self.com.clone();
144        fabric_begin_end_proxy(
145            move |callback| unsafe {
146                com1.BeginGetPartitionLoadInformation(desc, timeout_milliseconds, callback)
147            },
148            move |ctx| unsafe { com2.EndGetPartitionLoadInformation(ctx) },
149            cancellation_token,
150        )
151    }
152
153    fn get_deployed_replica_detail_internal(
154        &self,
155        desc: &FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION,
156        timeout_milliseconds: u32,
157        cancellation_token: Option<BoxedCancelToken>,
158    ) -> FabricReceiver<crate::WinResult<IFabricGetDeployedServiceReplicaDetailResult>> {
159        let com1 = &self.com;
160        let com2 = self.com.clone();
161        fabric_begin_end_proxy(
162            move |callback| unsafe {
163                com1.BeginGetDeployedReplicaDetail(desc, timeout_milliseconds, callback)
164            },
165            move |ctx| unsafe { com2.EndGetDeployedReplicaDetail(ctx) },
166            cancellation_token,
167        )
168    }
169}
170
171impl From<IFabricQueryClient10> for QueryClient {
172    fn from(com: IFabricQueryClient10) -> Self {
173        Self { com }
174    }
175}
176
177impl From<QueryClient> for IFabricQueryClient10 {
178    fn from(value: QueryClient) -> Self {
179        value.com
180    }
181}
182
183impl QueryClient {
184    // List nodes in the cluster
185    pub async fn get_node_list(
186        &self,
187        desc: &NodeQueryDescription,
188        timeout: Duration,
189        cancellation_token: Option<BoxedCancelToken>,
190    ) -> crate::Result<NodeListResult> {
191        // Note that the SF raw structs are scoped to avoid having them across await points.
192        // This makes api Send. All FabricClient api should follow this pattern.
193        let com = {
194            let ex3 = FABRIC_NODE_QUERY_DESCRIPTION_EX3 {
195                MaxResults: desc.paged_query.max_results.unwrap_or(0),
196                Reserved: std::ptr::null_mut(),
197            };
198
199            let ex2 = FABRIC_NODE_QUERY_DESCRIPTION_EX2 {
200                NodeStatusFilter: desc.node_status_filter.bits(),
201                Reserved: std::ptr::addr_of!(ex3) as *mut c_void,
202            };
203
204            let ex1 = FABRIC_NODE_QUERY_DESCRIPTION_EX1 {
205                ContinuationToken: get_pcwstr_from_opt(&desc.paged_query.continuation_token),
206                Reserved: std::ptr::addr_of!(ex2) as *mut c_void,
207            };
208
209            let arg = FABRIC_NODE_QUERY_DESCRIPTION {
210                NodeNameFilter: get_pcwstr_from_opt(&desc.node_name_filter),
211                Reserved: std::ptr::addr_of!(ex1) as *mut c_void,
212            };
213            self.get_node_list_internal(
214                &arg,
215                timeout.as_millis().try_into().unwrap(),
216                cancellation_token,
217            )
218        }
219        .await??;
220        Ok(NodeListResult::from(com))
221    }
222
223    pub async fn get_application_list(
224        &self,
225        desc: &crate::types::ApplicationQueryDescription,
226        timeout: Duration,
227        cancellation_token: Option<BoxedCancelToken>,
228    ) -> crate::Result<crate::types::ApplicationListResult> {
229        let com = {
230            let (mut base, mut ex1, mut ex2, mut ex3, ex4) = desc.get_raw_parts();
231            base.Reserved = std::ptr::addr_of!(ex1) as *mut c_void;
232            ex1.Reserved = std::ptr::addr_of!(ex2) as *mut c_void;
233            ex2.Reserved = std::ptr::addr_of!(ex3) as *mut c_void;
234            ex3.Reserved = std::ptr::addr_of!(ex4) as *mut c_void;
235            self.get_application_list_internal(
236                &base,
237                timeout.as_millis().try_into().unwrap(),
238                cancellation_token,
239            )
240        }
241        .await??;
242        Ok(crate::types::ApplicationListResult::from(&com))
243    }
244    pub async fn get_service_list(
245        &self,
246        desc: &ServiceQueryDescription,
247        timeout: Duration,
248        cancellation_token: Option<BoxedCancelToken>,
249    ) -> crate::Result<crate::types::ServiceListResult> {
250        let com = {
251            let (mut base, mut ex1, mut ex2, ex3) = desc.get_raw_parts();
252            base.Reserved = &ex1 as *const _ as *mut c_void;
253            ex1.Reserved = &ex2 as *const _ as *mut c_void;
254            ex2.Reserved = &ex3 as *const _ as *mut c_void;
255
256            self.get_service_list_internal(&base, timeout.as_millis() as u32, cancellation_token)
257        }
258        .await??;
259        Ok(crate::types::ServiceListResult::from(&com))
260    }
261
262    pub async fn get_partition_list(
263        &self,
264        desc: &ServicePartitionQueryDescription,
265        timeout: Duration,
266        cancellation_token: Option<BoxedCancelToken>,
267    ) -> crate::Result<ServicePartitionList> {
268        let com = {
269            let raw: FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION = desc.into();
270            let mili = timeout.as_millis() as u32;
271            self.get_partition_list_internal(&raw, mili, cancellation_token)
272        }
273        .await??;
274        Ok(ServicePartitionList::new(com))
275    }
276
277    pub async fn get_replica_list(
278        &self,
279        desc: &ServiceReplicaQueryDescription,
280        timeout: Duration,
281        cancellation_token: Option<BoxedCancelToken>,
282    ) -> crate::Result<ServiceReplicaList> {
283        let com = {
284            let raw: FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION = desc.into();
285            let mili = timeout.as_millis() as u32;
286            self.get_replica_list_internal(&raw, mili, cancellation_token)
287        }
288        .await??;
289        Ok(ServiceReplicaList::new(com))
290    }
291
292    pub async fn get_partition_load_information(
293        &self,
294        desc: &PartitionLoadInformationQueryDescription,
295        timeout: Duration,
296        cancellation_token: Option<BoxedCancelToken>,
297    ) -> crate::Result<PartitionLoadInformation> {
298        let com = {
299            let raw: FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION = desc.into();
300            let timeout_ms = timeout.as_micros() as u32;
301            self.get_partition_load_information_internal(&raw, timeout_ms, cancellation_token)
302        }
303        .await??;
304        Ok(PartitionLoadInformation::new(com))
305    }
306
307    pub async fn get_deployed_replica_detail(
308        &self,
309        desc: &DeployedServiceReplicaDetailQueryDescription,
310        timeout: Duration,
311        cancellation_token: Option<BoxedCancelToken>,
312    ) -> crate::Result<DeployedServiceReplicaDetailQueryResult> {
313        let com = {
314            let raw: FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION = desc.into();
315            let timeout_ms = timeout.as_micros() as u32;
316            self.get_deployed_replica_detail_internal(&raw, timeout_ms, cancellation_token)
317        }
318        .await??;
319        Ok(DeployedServiceReplicaDetailQueryResult::new(com))
320    }
321}