Skip to main content

mssf_core/client/
query_client.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5use std::{ffi::c_void, time::Duration};
6
7use mssf_com::{
8    FabricClient::{
9        IFabricGetApplicationListResult2, IFabricGetDeployedServiceReplicaDetailResult,
10        IFabricGetNodeListResult2, IFabricGetPartitionListResult2,
11        IFabricGetPartitionLoadInformationResult, IFabricGetReplicaListResult2,
12        IFabricQueryClient10,
13    },
14    FabricTypes::{
15        FABRIC_APPLICATION_QUERY_DESCRIPTION,
16        FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION, FABRIC_NODE_QUERY_DESCRIPTION,
17        FABRIC_NODE_QUERY_DESCRIPTION_EX1, FABRIC_NODE_QUERY_DESCRIPTION_EX2,
18        FABRIC_NODE_QUERY_DESCRIPTION_EX3, FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION,
19        FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION, FABRIC_SERVICE_QUERY_DESCRIPTION,
20        FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION,
21    },
22};
23
24use crate::types::{
25    DeployedServiceReplicaDetailQueryDescription, DeployedServiceReplicaDetailQueryResult,
26    GetPartitionLoadInformationResult, NodeListResult, NodeQueryDescription,
27    PartitionLoadInformationQueryDescription, ServicePartitionList,
28    ServicePartitionQueryDescription, ServiceReplicaList, ServiceReplicaQueryDescription,
29};
30use crate::{
31    runtime::executor::BoxedCancelToken,
32    sync::{FabricReceiver, fabric_begin_end_proxy},
33    types::ServiceQueryDescription,
34};
35
36#[derive(Debug, Clone)]
37pub struct QueryClient {
38    com: IFabricQueryClient10,
39}
40
41// Internal implementation block
42// Internal functions focuses on changing SF callback to async future,
43// while the public apis impl focuses on type conversion.
44
45impl QueryClient {
46    pub fn get_node_list_internal(
47        &self,
48        query_description: &FABRIC_NODE_QUERY_DESCRIPTION,
49        timeout_milliseconds: u32,
50        cancellation_token: Option<BoxedCancelToken>,
51    ) -> FabricReceiver<crate::WinResult<IFabricGetNodeListResult2>> {
52        let com1 = &self.com;
53        let com2 = self.com.clone();
54
55        fabric_begin_end_proxy(
56            move |callback| unsafe {
57                com1.BeginGetNodeList(query_description, timeout_milliseconds, callback)
58            },
59            move |ctx| unsafe { com2.EndGetNodeList2(ctx) },
60            cancellation_token,
61        )
62    }
63
64    pub fn get_application_list_internal(
65        &self,
66        query_description: &FABRIC_APPLICATION_QUERY_DESCRIPTION,
67        timeout_milliseconds: u32,
68        cancellation_token: Option<BoxedCancelToken>,
69    ) -> FabricReceiver<crate::WinResult<IFabricGetApplicationListResult2>> {
70        let com1 = &self.com;
71        let com2 = self.com.clone();
72        fabric_begin_end_proxy(
73            move |callback| unsafe {
74                com1.BeginGetApplicationList(query_description, timeout_milliseconds, callback)
75            },
76            move |ctx| unsafe { com2.EndGetApplicationList2(ctx) },
77            cancellation_token,
78        )
79    }
80
81    fn get_service_list_internal(
82        &self,
83        desc: &FABRIC_SERVICE_QUERY_DESCRIPTION,
84        timeout_milliseconds: u32,
85        cancellation_token: Option<BoxedCancelToken>,
86    ) -> FabricReceiver<crate::WinResult<mssf_com::FabricClient::IFabricGetServiceListResult2>>
87    {
88        let com1 = &self.com;
89        let com2 = self.com.clone();
90        fabric_begin_end_proxy(
91            move |callback| unsafe {
92                com1.BeginGetServiceList(desc, timeout_milliseconds, callback)
93            },
94            move |ctx| unsafe { com2.EndGetServiceList2(ctx) },
95            cancellation_token,
96        )
97    }
98
99    fn get_partition_list_internal(
100        &self,
101        desc: &FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION,
102        timeout_milliseconds: u32,
103        cancellation_token: Option<BoxedCancelToken>,
104    ) -> FabricReceiver<crate::WinResult<IFabricGetPartitionListResult2>> {
105        let com1 = &self.com;
106        let com2 = self.com.clone();
107        fabric_begin_end_proxy(
108            move |callback| unsafe {
109                com1.BeginGetPartitionList(desc, timeout_milliseconds, callback)
110            },
111            move |ctx| unsafe { com2.EndGetPartitionList2(ctx) },
112            cancellation_token,
113        )
114    }
115
116    fn get_replica_list_internal(
117        &self,
118        desc: &FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION,
119        timeout_milliseconds: u32,
120        cancellation_token: Option<BoxedCancelToken>,
121    ) -> FabricReceiver<crate::WinResult<IFabricGetReplicaListResult2>> {
122        let com1 = &self.com;
123        let com2 = self.com.clone();
124        fabric_begin_end_proxy(
125            move |callback| unsafe {
126                com1.BeginGetReplicaList(desc, timeout_milliseconds, callback)
127            },
128            move |ctx| unsafe { com2.EndGetReplicaList2(ctx) },
129            cancellation_token,
130        )
131    }
132
133    fn get_partition_load_information_internal(
134        &self,
135        desc: &FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION,
136        timeout_milliseconds: u32,
137        cancellation_token: Option<BoxedCancelToken>,
138    ) -> FabricReceiver<crate::WinResult<IFabricGetPartitionLoadInformationResult>> {
139        let com1 = &self.com;
140        let com2 = self.com.clone();
141        fabric_begin_end_proxy(
142            move |callback| unsafe {
143                com1.BeginGetPartitionLoadInformation(desc, timeout_milliseconds, callback)
144            },
145            move |ctx| unsafe { com2.EndGetPartitionLoadInformation(ctx) },
146            cancellation_token,
147        )
148    }
149
150    fn get_deployed_replica_detail_internal(
151        &self,
152        desc: &FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION,
153        timeout_milliseconds: u32,
154        cancellation_token: Option<BoxedCancelToken>,
155    ) -> FabricReceiver<crate::WinResult<IFabricGetDeployedServiceReplicaDetailResult>> {
156        let com1 = &self.com;
157        let com2 = self.com.clone();
158        fabric_begin_end_proxy(
159            move |callback| unsafe {
160                com1.BeginGetDeployedReplicaDetail(desc, timeout_milliseconds, callback)
161            },
162            move |ctx| unsafe { com2.EndGetDeployedReplicaDetail(ctx) },
163            cancellation_token,
164        )
165    }
166}
167
168impl From<IFabricQueryClient10> for QueryClient {
169    fn from(com: IFabricQueryClient10) -> Self {
170        Self { com }
171    }
172}
173
174impl From<QueryClient> for IFabricQueryClient10 {
175    fn from(value: QueryClient) -> Self {
176        value.com
177    }
178}
179
180impl QueryClient {
181    // List nodes in the cluster
182    pub async fn get_node_list(
183        &self,
184        desc: &NodeQueryDescription,
185        timeout: Duration,
186        cancellation_token: Option<BoxedCancelToken>,
187    ) -> crate::Result<NodeListResult> {
188        // Note that the SF raw structs are scoped to avoid having them across await points.
189        // This makes api Send. All FabricClient api should follow this pattern.
190        let com = {
191            let ex3 = FABRIC_NODE_QUERY_DESCRIPTION_EX3 {
192                MaxResults: desc.paged_query.max_results.unwrap_or(0),
193                Reserved: std::ptr::null_mut(),
194            };
195
196            let ex2 = FABRIC_NODE_QUERY_DESCRIPTION_EX2 {
197                NodeStatusFilter: desc.node_status_filter.bits(),
198                Reserved: std::ptr::addr_of!(ex3) as *mut c_void,
199            };
200
201            let ex1 = FABRIC_NODE_QUERY_DESCRIPTION_EX1 {
202                ContinuationToken: desc.paged_query.continuation_token.as_ref().into(),
203                Reserved: std::ptr::addr_of!(ex2) as *mut c_void,
204            };
205
206            let arg = FABRIC_NODE_QUERY_DESCRIPTION {
207                NodeNameFilter: desc.node_name_filter.as_ref().into(),
208                Reserved: std::ptr::addr_of!(ex1) as *mut c_void,
209            };
210            self.get_node_list_internal(
211                &arg,
212                timeout.as_millis().try_into().unwrap(),
213                cancellation_token,
214            )
215        }
216        .await??;
217        Ok(NodeListResult::from(&com))
218    }
219
220    pub async fn get_application_list(
221        &self,
222        desc: &crate::types::ApplicationQueryDescription,
223        timeout: Duration,
224        cancellation_token: Option<BoxedCancelToken>,
225    ) -> crate::Result<crate::types::ApplicationListResult> {
226        let com = {
227            let (mut base, mut ex1, mut ex2, mut ex3, ex4) = desc.get_raw_parts();
228            base.Reserved = std::ptr::addr_of!(ex1) as *mut c_void;
229            #[allow(unused_assignments)]
230            {
231                ex1.Reserved = std::ptr::addr_of!(ex2) as *mut c_void;
232                ex2.Reserved = std::ptr::addr_of!(ex3) as *mut c_void;
233                ex3.Reserved = std::ptr::addr_of!(ex4) as *mut c_void;
234            }
235            self.get_application_list_internal(
236                &base,
237                timeout.as_millis().try_into().unwrap(),
238                cancellation_token,
239            )
240        }
241        .await??;
242        Ok(crate::types::ApplicationListResult::from(&com))
243    }
244    pub async fn get_service_list(
245        &self,
246        desc: &ServiceQueryDescription,
247        timeout: Duration,
248        cancellation_token: Option<BoxedCancelToken>,
249    ) -> crate::Result<crate::types::ServiceListResult> {
250        let com = {
251            let (mut base, mut ex1, mut ex2, ex3) = desc.get_raw_parts();
252            base.Reserved = &ex1 as *const _ as *mut c_void;
253            #[allow(unused_assignments)]
254            {
255                ex1.Reserved = &ex2 as *const _ as *mut c_void;
256                ex2.Reserved = &ex3 as *const _ as *mut c_void;
257            }
258
259            self.get_service_list_internal(&base, timeout.as_millis() as u32, cancellation_token)
260        }
261        .await??;
262        Ok(crate::types::ServiceListResult::from(&com))
263    }
264
265    pub async fn get_partition_list(
266        &self,
267        desc: &ServicePartitionQueryDescription,
268        timeout: Duration,
269        cancellation_token: Option<BoxedCancelToken>,
270    ) -> crate::Result<ServicePartitionList> {
271        let com = {
272            let raw: FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION = desc.into();
273            let mili = timeout.as_millis() as u32;
274            self.get_partition_list_internal(&raw, mili, cancellation_token)
275        }
276        .await??;
277        Ok(ServicePartitionList::from(&com))
278    }
279
280    pub async fn get_replica_list(
281        &self,
282        desc: &ServiceReplicaQueryDescription,
283        timeout: Duration,
284        cancellation_token: Option<BoxedCancelToken>,
285    ) -> crate::Result<ServiceReplicaList> {
286        let com = {
287            let raw: FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION = desc.into();
288            let mili = timeout.as_millis() as u32;
289            self.get_replica_list_internal(&raw, mili, cancellation_token)
290        }
291        .await??;
292        Ok(ServiceReplicaList::from(&com))
293    }
294
295    pub async fn get_partition_load_information(
296        &self,
297        desc: &PartitionLoadInformationQueryDescription,
298        timeout: Duration,
299        cancellation_token: Option<BoxedCancelToken>,
300    ) -> crate::Result<GetPartitionLoadInformationResult> {
301        let com = {
302            let raw: FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION = desc.into();
303            let timeout_ms = timeout.as_micros() as u32;
304            self.get_partition_load_information_internal(&raw, timeout_ms, cancellation_token)
305        }
306        .await??;
307        Ok(GetPartitionLoadInformationResult::from(&com))
308    }
309
310    pub async fn get_deployed_replica_detail(
311        &self,
312        desc: &DeployedServiceReplicaDetailQueryDescription,
313        timeout: Duration,
314        cancellation_token: Option<BoxedCancelToken>,
315    ) -> crate::Result<DeployedServiceReplicaDetailQueryResult> {
316        let com = {
317            let raw: FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION = desc.into();
318            let timeout_ms = timeout.as_micros() as u32;
319            self.get_deployed_replica_detail_internal(&raw, timeout_ms, cancellation_token)
320        }
321        .await??;
322        Ok(DeployedServiceReplicaDetailQueryResult::new(com))
323    }
324}