mssf_core/client/
query_client.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5use std::{ffi::c_void, time::Duration};
6
7use mssf_com::{
8    FabricClient::{
9        IFabricGetDeployedServiceReplicaDetailResult, IFabricGetNodeListResult2,
10        IFabricGetPartitionListResult2, IFabricGetPartitionLoadInformationResult,
11        IFabricGetReplicaListResult2, IFabricQueryClient10,
12    },
13    FabricTypes::{
14        FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION, FABRIC_NODE_QUERY_DESCRIPTION,
15        FABRIC_NODE_QUERY_DESCRIPTION_EX1, FABRIC_NODE_QUERY_DESCRIPTION_EX2,
16        FABRIC_NODE_QUERY_DESCRIPTION_EX3, FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION,
17        FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION, FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION,
18    },
19};
20
21use crate::{
22    runtime::executor::BoxedCancelToken,
23    sync::{FabricReceiver, fabric_begin_end_proxy},
24};
25use crate::{
26    strings::get_pcwstr_from_opt,
27    types::{
28        DeployedServiceReplicaDetailQueryDescription, DeployedServiceReplicaDetailQueryResult,
29        NodeList, NodeQueryDescription, PartitionLoadInformation,
30        PartitionLoadInformationQueryDescription, ServicePartitionList,
31        ServicePartitionQueryDescription, ServiceReplicaList, ServiceReplicaQueryDescription,
32    },
33};
34
35#[derive(Debug, Clone)]
36pub struct QueryClient {
37    com: IFabricQueryClient10,
38}
39
40// Internal implementation block
41// Internal functions focuses on changing SF callback to async future,
42// while the public apis impl focuses on type conversion.
43
44impl QueryClient {
45    pub fn get_node_list_internal(
46        &self,
47        query_description: &FABRIC_NODE_QUERY_DESCRIPTION,
48        timeout_milliseconds: u32,
49        cancellation_token: Option<BoxedCancelToken>,
50    ) -> FabricReceiver<crate::WinResult<IFabricGetNodeListResult2>> {
51        let com1 = &self.com;
52        let com2 = self.com.clone();
53
54        fabric_begin_end_proxy(
55            move |callback| unsafe {
56                com1.BeginGetNodeList(query_description, timeout_milliseconds, callback)
57            },
58            move |ctx| unsafe { com2.EndGetNodeList2(ctx) },
59            cancellation_token,
60        )
61    }
62
63    fn get_partition_list_internal(
64        &self,
65        desc: &FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION,
66        timeout_milliseconds: u32,
67        cancellation_token: Option<BoxedCancelToken>,
68    ) -> FabricReceiver<crate::WinResult<IFabricGetPartitionListResult2>> {
69        let com1 = &self.com;
70        let com2 = self.com.clone();
71        fabric_begin_end_proxy(
72            move |callback| unsafe {
73                com1.BeginGetPartitionList(desc, timeout_milliseconds, callback)
74            },
75            move |ctx| unsafe { com2.EndGetPartitionList2(ctx) },
76            cancellation_token,
77        )
78    }
79
80    fn get_replica_list_internal(
81        &self,
82        desc: &FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION,
83        timeout_milliseconds: u32,
84        cancellation_token: Option<BoxedCancelToken>,
85    ) -> FabricReceiver<crate::WinResult<IFabricGetReplicaListResult2>> {
86        let com1 = &self.com;
87        let com2 = self.com.clone();
88        fabric_begin_end_proxy(
89            move |callback| unsafe {
90                com1.BeginGetReplicaList(desc, timeout_milliseconds, callback)
91            },
92            move |ctx| unsafe { com2.EndGetReplicaList2(ctx) },
93            cancellation_token,
94        )
95    }
96
97    fn get_partition_load_information_internal(
98        &self,
99        desc: &FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION,
100        timeout_milliseconds: u32,
101        cancellation_token: Option<BoxedCancelToken>,
102    ) -> FabricReceiver<crate::WinResult<IFabricGetPartitionLoadInformationResult>> {
103        let com1 = &self.com;
104        let com2 = self.com.clone();
105        fabric_begin_end_proxy(
106            move |callback| unsafe {
107                com1.BeginGetPartitionLoadInformation(desc, timeout_milliseconds, callback)
108            },
109            move |ctx| unsafe { com2.EndGetPartitionLoadInformation(ctx) },
110            cancellation_token,
111        )
112    }
113
114    fn get_deployed_replica_detail_internal(
115        &self,
116        desc: &FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION,
117        timeout_milliseconds: u32,
118        cancellation_token: Option<BoxedCancelToken>,
119    ) -> FabricReceiver<crate::WinResult<IFabricGetDeployedServiceReplicaDetailResult>> {
120        let com1 = &self.com;
121        let com2 = self.com.clone();
122        fabric_begin_end_proxy(
123            move |callback| unsafe {
124                com1.BeginGetDeployedReplicaDetail(desc, timeout_milliseconds, callback)
125            },
126            move |ctx| unsafe { com2.EndGetDeployedReplicaDetail(ctx) },
127            cancellation_token,
128        )
129    }
130}
131
132impl From<IFabricQueryClient10> for QueryClient {
133    fn from(com: IFabricQueryClient10) -> Self {
134        Self { com }
135    }
136}
137
138impl From<QueryClient> for IFabricQueryClient10 {
139    fn from(value: QueryClient) -> Self {
140        value.com
141    }
142}
143
144impl QueryClient {
145    // List nodes in the cluster
146    pub async fn get_node_list(
147        &self,
148        desc: &NodeQueryDescription,
149        timeout: Duration,
150        cancellation_token: Option<BoxedCancelToken>,
151    ) -> crate::Result<NodeList> {
152        // Note that the SF raw structs are scoped to avoid having them across await points.
153        // This makes api Send. All FabricClient api should follow this pattern.
154        let com = {
155            let ex3 = FABRIC_NODE_QUERY_DESCRIPTION_EX3 {
156                MaxResults: desc.paged_query.max_results.unwrap_or(0),
157                Reserved: std::ptr::null_mut(),
158            };
159
160            let ex2 = FABRIC_NODE_QUERY_DESCRIPTION_EX2 {
161                NodeStatusFilter: desc.node_status_filter.bits(),
162                Reserved: std::ptr::addr_of!(ex3) as *mut c_void,
163            };
164
165            let ex1 = FABRIC_NODE_QUERY_DESCRIPTION_EX1 {
166                ContinuationToken: get_pcwstr_from_opt(&desc.paged_query.continuation_token),
167                Reserved: std::ptr::addr_of!(ex2) as *mut c_void,
168            };
169
170            let arg = FABRIC_NODE_QUERY_DESCRIPTION {
171                NodeNameFilter: get_pcwstr_from_opt(&desc.node_name_filter),
172                Reserved: std::ptr::addr_of!(ex1) as *mut c_void,
173            };
174            self.get_node_list_internal(
175                &arg,
176                timeout.as_millis().try_into().unwrap(),
177                cancellation_token,
178            )
179        }
180        .await??;
181        Ok(NodeList::from(com))
182    }
183
184    pub async fn get_partition_list(
185        &self,
186        desc: &ServicePartitionQueryDescription,
187        timeout: Duration,
188        cancellation_token: Option<BoxedCancelToken>,
189    ) -> crate::Result<ServicePartitionList> {
190        let com = {
191            let raw: FABRIC_SERVICE_PARTITION_QUERY_DESCRIPTION = desc.into();
192            let mili = timeout.as_millis() as u32;
193            self.get_partition_list_internal(&raw, mili, cancellation_token)
194        }
195        .await??;
196        Ok(ServicePartitionList::new(com))
197    }
198
199    pub async fn get_replica_list(
200        &self,
201        desc: &ServiceReplicaQueryDescription,
202        timeout: Duration,
203        cancellation_token: Option<BoxedCancelToken>,
204    ) -> crate::Result<ServiceReplicaList> {
205        let com = {
206            let raw: FABRIC_SERVICE_REPLICA_QUERY_DESCRIPTION = desc.into();
207            let mili = timeout.as_millis() as u32;
208            self.get_replica_list_internal(&raw, mili, cancellation_token)
209        }
210        .await??;
211        Ok(ServiceReplicaList::new(com))
212    }
213
214    pub async fn get_partition_load_information(
215        &self,
216        desc: &PartitionLoadInformationQueryDescription,
217        timeout: Duration,
218        cancellation_token: Option<BoxedCancelToken>,
219    ) -> crate::Result<PartitionLoadInformation> {
220        let com = {
221            let raw: FABRIC_PARTITION_LOAD_INFORMATION_QUERY_DESCRIPTION = desc.into();
222            let timeout_ms = timeout.as_micros() as u32;
223            self.get_partition_load_information_internal(&raw, timeout_ms, cancellation_token)
224        }
225        .await??;
226        Ok(PartitionLoadInformation::new(com))
227    }
228
229    pub async fn get_deployed_replica_detail(
230        &self,
231        desc: &DeployedServiceReplicaDetailQueryDescription,
232        timeout: Duration,
233        cancellation_token: Option<BoxedCancelToken>,
234    ) -> crate::Result<DeployedServiceReplicaDetailQueryResult> {
235        let com = {
236            let raw: FABRIC_DEPLOYED_SERVICE_REPLICA_DETAIL_QUERY_DESCRIPTION = desc.into();
237            let timeout_ms = timeout.as_micros() as u32;
238            self.get_deployed_replica_detail_internal(&raw, timeout_ms, cancellation_token)
239        }
240        .await??;
241        Ok(DeployedServiceReplicaDetailQueryResult::new(com))
242    }
243}