Skip to main content

mssf_core/client/
svc_mgmt_client.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{ffi::c_void, time::Duration};
7
8use crate::{PCWSTR, WString, runtime::executor::BoxedCancelToken, types::Uri};
9use mssf_com::{
10    FabricClient::{IFabricResolvedServicePartitionResult, IFabricServiceManagementClient6},
11    FabricTypes::{
12        FABRIC_PARTITION_KEY_TYPE, FABRIC_PARTITION_KEY_TYPE_INT64,
13        FABRIC_PARTITION_KEY_TYPE_INVALID, FABRIC_PARTITION_KEY_TYPE_NONE,
14        FABRIC_PARTITION_KEY_TYPE_STRING, FABRIC_REMOVE_REPLICA_DESCRIPTION,
15        FABRIC_RESOLVED_SERVICE_ENDPOINT, FABRIC_RESTART_REPLICA_DESCRIPTION,
16        FABRIC_SERVICE_DESCRIPTION, FABRIC_SERVICE_ENDPOINT_ROLE,
17        FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, FABRIC_SERVICE_PARTITION_KIND,
18        FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, FABRIC_SERVICE_PARTITION_KIND_INVALID,
19        FABRIC_SERVICE_PARTITION_KIND_NAMED, FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
20        FABRIC_SERVICE_ROLE_INVALID, FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY,
21        FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY,
22        FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY, FABRIC_SERVICE_ROLE_STATELESS,
23        FABRIC_SERVICE_UPDATE_DESCRIPTION, FABRIC_URI,
24    },
25};
26
27use crate::sync::{FabricReceiver, fabric_begin_end_proxy};
28
29use crate::types::{
30    RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription,
31};
32
33// Service Management Client
34#[derive(Debug, Clone)]
35pub struct ServiceManagementClient {
36    com: IFabricServiceManagementClient6,
37}
38
39impl ServiceManagementClient {
40    pub fn get_com(&self) -> IFabricServiceManagementClient6 {
41        self.com.clone()
42    }
43}
44// internal implementation block
45
46impl ServiceManagementClient {
47    fn resolve_service_partition_internal(
48        &self,
49        name: FABRIC_URI,
50        partition_key_type: FABRIC_PARTITION_KEY_TYPE,
51        partition_key: Option<*const ::core::ffi::c_void>,
52        previous_result: Option<&IFabricResolvedServicePartitionResult>, // This is different from generated code
53        timeout_milliseconds: u32,
54        cancellation_token: Option<BoxedCancelToken>,
55    ) -> FabricReceiver<crate::WinResult<IFabricResolvedServicePartitionResult>> {
56        let com1 = &self.com;
57        let com2 = self.com.clone();
58        fabric_begin_end_proxy(
59            move |callback| unsafe {
60                com1.BeginResolveServicePartition(
61                    name,
62                    partition_key_type,
63                    partition_key.unwrap_or(std::ptr::null()),
64                    previous_result,
65                    timeout_milliseconds,
66                    callback,
67                )
68            },
69            move |ctx| unsafe { com2.EndResolveServicePartition(ctx) },
70            cancellation_token,
71        )
72    }
73
74    fn restart_replica_internal(
75        &self,
76        desc: &FABRIC_RESTART_REPLICA_DESCRIPTION,
77        timeout_milliseconds: u32,
78        cancellation_token: Option<BoxedCancelToken>,
79    ) -> FabricReceiver<crate::WinResult<()>> {
80        let com1 = &self.com;
81        let com2 = self.com.clone();
82        fabric_begin_end_proxy(
83            move |callback| unsafe {
84                com1.BeginRestartReplica(desc, timeout_milliseconds, callback)
85            },
86            move |ctx| unsafe { com2.EndRestartReplica(ctx) },
87            cancellation_token,
88        )
89    }
90
91    fn remove_replica_internal(
92        &self,
93        desc: &FABRIC_REMOVE_REPLICA_DESCRIPTION,
94        timeout_milliseconds: u32,
95        cancellation_token: Option<BoxedCancelToken>,
96    ) -> FabricReceiver<crate::WinResult<()>> {
97        let com1 = &self.com;
98        let com2 = self.com.clone();
99        fabric_begin_end_proxy(
100            move |callback| unsafe {
101                com1.BeginRemoveReplica(desc, timeout_milliseconds, callback)
102            },
103            move |ctx| unsafe { com2.EndRemoveReplica(ctx) },
104            cancellation_token,
105        )
106    }
107
108    fn register_service_notification_filter_internal(
109        &self,
110        desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
111        timeout_milliseconds: u32,
112        cancellation_token: Option<BoxedCancelToken>,
113    ) -> FabricReceiver<crate::WinResult<i64>> {
114        let com1 = &self.com;
115        let com2 = self.com.clone();
116        fabric_begin_end_proxy(
117            move |callback| unsafe {
118                com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback)
119            },
120            move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) },
121            cancellation_token,
122        )
123    }
124
125    fn unregister_service_notification_filter_internal(
126        &self,
127        filterid: i64,
128        timeout_milliseconds: u32,
129        cancellation_token: Option<BoxedCancelToken>,
130    ) -> FabricReceiver<crate::WinResult<()>> {
131        let com1 = &self.com;
132        let com2 = self.com.clone();
133        fabric_begin_end_proxy(
134            move |callback| unsafe {
135                com1.BeginUnregisterServiceNotificationFilter(
136                    filterid,
137                    timeout_milliseconds,
138                    callback,
139                )
140            },
141            move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) },
142            cancellation_token,
143        )
144    }
145
146    fn create_service_internal(
147        &self,
148        desc: &FABRIC_SERVICE_DESCRIPTION,
149        timeout_milliseconds: u32,
150        cancellation_token: Option<BoxedCancelToken>,
151    ) -> FabricReceiver<crate::WinResult<()>> {
152        let com1 = &self.com;
153        let com2 = self.com.clone();
154        fabric_begin_end_proxy(
155            move |callback| unsafe {
156                com1.BeginCreateService(desc, timeout_milliseconds, callback)
157            },
158            move |ctx| unsafe { com2.EndCreateService(ctx) },
159            cancellation_token,
160        )
161    }
162
163    fn update_service_internal(
164        &self,
165        name: FABRIC_URI,
166        desc: &FABRIC_SERVICE_UPDATE_DESCRIPTION,
167        timeout_milliseconds: u32,
168        cancellation_token: Option<BoxedCancelToken>,
169    ) -> FabricReceiver<crate::WinResult<()>> {
170        let com1 = &self.com;
171        let com2 = self.com.clone();
172        fabric_begin_end_proxy(
173            move |callback| unsafe {
174                com1.BeginUpdateService(name, desc, timeout_milliseconds, callback)
175            },
176            move |ctx| unsafe { com2.EndUpdateService(ctx) },
177            cancellation_token,
178        )
179    }
180
181    fn delete_service_internal(
182        &self,
183        name: FABRIC_URI,
184        timeout_milliseconds: u32,
185        cancellation_token: Option<BoxedCancelToken>,
186    ) -> FabricReceiver<crate::WinResult<()>> {
187        let com1 = &self.com;
188        let com2 = self.com.clone();
189        fabric_begin_end_proxy(
190            move |callback| unsafe {
191                com1.BeginDeleteService(name, timeout_milliseconds, callback)
192            },
193            move |ctx| unsafe { com2.EndDeleteService(ctx) },
194            cancellation_token,
195        )
196    }
197}
198
199impl From<IFabricServiceManagementClient6> for ServiceManagementClient {
200    fn from(com: IFabricServiceManagementClient6) -> Self {
201        Self { com }
202    }
203}
204
205impl From<ServiceManagementClient> for IFabricServiceManagementClient6 {
206    fn from(value: ServiceManagementClient) -> Self {
207        value.com
208    }
209}
210
211// public implementation block - tokio required
212
213impl ServiceManagementClient {
214    // Resolve service partition
215    pub async fn resolve_service_partition(
216        &self,
217        name: &Uri,
218        key_type: &PartitionKeyType,
219        prev: Option<&ResolvedServicePartition>,
220        timeout: Duration,
221        cancellation_token: Option<BoxedCancelToken>,
222    ) -> crate::Result<ResolvedServicePartition> {
223        let com = {
224            let uri = name.as_raw();
225            // supply prev as null if not present
226            let prev_opt = prev.map(|x| &x.com);
227
228            let part_key_opt = key_type.get_raw_opt();
229
230            self.resolve_service_partition_internal(
231                uri,
232                key_type.into(),
233                part_key_opt,
234                prev_opt,
235                timeout.as_millis().try_into().unwrap(),
236                cancellation_token,
237            )
238        }
239        .await??;
240        let res = ResolvedServicePartition::from(com);
241        Ok(res)
242    }
243
244    /// Simulates a service replica failure by restarting a persisted service replica,
245    /// closing the replica, and then reopening it. Use this to test your service for problems
246    /// along the replica reopen path. This helps simulate the report fault temporary path through client APIs.
247    /// This is only valid for replicas that belong to stateful persisted services.
248    pub async fn restart_replica(
249        &self,
250        desc: &RestartReplicaDescription,
251        timeout: Duration,
252        cancellation_token: Option<BoxedCancelToken>,
253    ) -> crate::Result<()> {
254        {
255            let raw: FABRIC_RESTART_REPLICA_DESCRIPTION = desc.into();
256            self.restart_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
257        }
258        .await?
259        .map_err(crate::Error::from)
260    }
261
262    /// This API gives a running replica the chance to cleanup its state and be gracefully shutdown.
263    /// WARNING: There are no safety checks performed when this API is used.
264    /// Incorrect use of this API can lead to data loss for stateful services.
265    /// Remarks:
266    /// For stateless services, Instance Abort is called.
267    pub async fn remove_replica(
268        &self,
269        desc: &RemoveReplicaDescription,
270        timeout: Duration,
271        cancellation_token: Option<BoxedCancelToken>,
272    ) -> crate::Result<()> {
273        {
274            let raw: FABRIC_REMOVE_REPLICA_DESCRIPTION = desc.into();
275            self.remove_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
276        }
277        .await?
278        .map_err(crate::Error::from)
279    }
280
281    /// Remarks:
282    /// There is a cache of service endpoints in the client that gets updated by notifications
283    /// and this same cache is used to satisfy complaint based resolution requests
284    /// (see resolve_service_partition())). Applications that both register for notifications
285    /// and use complaint based resolution on the same client instance typically only need to
286    /// pass null for the ResolvedServicePartition argument during resolution.
287    /// This will always return the endpoints in the client cache updated by the latest notification.
288    /// The notification mechanism itself will keep the client cache updated when service endpoints change.
289    ///
290    /// Notification callback is delivered on `FabricClientBuilder::with_on_service_notification` as well.
291    /// The callback contains minimum info only as a signal, user can call resolve_service_partition()
292    /// again to retrieve full info from the cache.
293    ///
294    /// This is observed to have 1~4 secs delay compared with brute force complaint based resolve.
295    pub async fn register_service_notification_filter(
296        &self,
297        desc: &ServiceNotificationFilterDescription,
298        timeout: Duration,
299        cancellation_token: Option<BoxedCancelToken>,
300    ) -> crate::Result<FilterIdHandle> {
301        let id = {
302            let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into();
303            self.register_service_notification_filter_internal(
304                &raw,
305                timeout.as_millis() as u32,
306                cancellation_token,
307            )
308        }
309        .await??;
310        Ok(FilterIdHandle { id })
311    }
312
313    /// It's not necessary to unregister individual filters if the client itself
314    /// will no longer be used since all ServiceNotificationFilterDescription
315    /// objects registered by the FabricClient will be automatically unregistered when client is disposed.
316    pub async fn unregister_service_notification_filter(
317        &self,
318        filter_id_handle: FilterIdHandle,
319        timeout: Duration,
320        cancellation_token: Option<BoxedCancelToken>,
321    ) -> crate::Result<()> {
322        self.unregister_service_notification_filter_internal(
323            filter_id_handle.id,
324            timeout.as_millis() as u32,
325            cancellation_token,
326        )
327        .await?
328        .map_err(crate::Error::from)
329    }
330
331    pub async fn create_service(
332        &self,
333        desc: &crate::types::ServiceDescription,
334        timeout: Duration,
335        cancellation_token: Option<BoxedCancelToken>,
336    ) -> crate::Result<()> {
337        {
338            let desc_raw = desc.build_raw();
339            let ffi_raw = desc_raw.as_ffi();
340            self.create_service_internal(&ffi_raw, timeout.as_millis() as u32, cancellation_token)
341        }
342        .await?
343        .map_err(crate::Error::from)
344    }
345
346    pub async fn update_service(
347        &self,
348        name: &Uri,
349        desc: &crate::types::ServiceUpdateDescription,
350        timeout: Duration,
351        cancellation_token: Option<BoxedCancelToken>,
352    ) -> crate::Result<()> {
353        {
354            let desc_raw = desc.build_raw();
355            let ffi_raw = desc_raw.as_ffi();
356            self.update_service_internal(
357                name.as_raw(),
358                &ffi_raw,
359                timeout.as_millis() as u32,
360                cancellation_token,
361            )
362        }
363        .await?
364        .map_err(crate::Error::from)
365    }
366
367    pub async fn delete_service(
368        &self,
369        name: &Uri,
370        timeout: Duration,
371        cancellation_token: Option<BoxedCancelToken>,
372    ) -> crate::Result<()> {
373        self.delete_service_internal(
374            name.as_raw(),
375            timeout.as_millis() as u32,
376            cancellation_token,
377        )
378        .await?
379        .map_err(crate::Error::from)
380    }
381}
382
383// Handle to the registered service notification filter
384
385#[derive(Debug, PartialEq)]
386pub struct FilterIdHandle {
387    pub(crate) id: i64,
388}
389
390// see ComFabricClient.cpp for conversion details in cpp
391#[derive(Debug, PartialEq, Clone)]
392pub enum PartitionKeyType {
393    Int64(i64),
394    Invalid,
395    None,
396    String(WString),
397}
398
399impl PartitionKeyType {
400    fn from_raw_svc_part(svc: ServicePartitionKind, data: *const c_void) -> PartitionKeyType {
401        match svc {
402            ServicePartitionKind::Int64Range => {
403                let x = unsafe { (data as *mut i64).as_ref().unwrap() };
404                PartitionKeyType::Int64(*x)
405            }
406            ServicePartitionKind::Invalid => PartitionKeyType::Invalid,
407            ServicePartitionKind::Singleton => PartitionKeyType::None,
408            ServicePartitionKind::Named => {
409                let x = data as *mut u16;
410                assert!(!x.is_null());
411                let s = WString::from(PCWSTR::from_raw(x));
412                PartitionKeyType::String(s)
413            }
414        }
415    }
416}
417
418impl From<&PartitionKeyType> for FABRIC_PARTITION_KEY_TYPE {
419    fn from(value: &PartitionKeyType) -> Self {
420        match value {
421            PartitionKeyType::Int64(_) => FABRIC_PARTITION_KEY_TYPE_INT64,
422            PartitionKeyType::Invalid => FABRIC_PARTITION_KEY_TYPE_INVALID,
423            PartitionKeyType::None => FABRIC_PARTITION_KEY_TYPE_NONE,
424            PartitionKeyType::String(_) => FABRIC_PARTITION_KEY_TYPE_STRING,
425        }
426    }
427}
428
429impl PartitionKeyType {
430    // get raw ptr to pass in com api
431    fn get_raw_opt(&self) -> Option<*const c_void> {
432        match self {
433            // Not sure if this is ok for i64
434            PartitionKeyType::Int64(x) => Some(x as *const i64 as *const c_void),
435            PartitionKeyType::Invalid => None,
436            PartitionKeyType::None => None,
437            PartitionKeyType::String(x) => {
438                Some(PCWSTR::from_raw(x.as_ptr()).as_ptr() as *const c_void)
439            }
440        }
441    }
442}
443
444#[derive(Clone, Copy, Debug, PartialEq)]
445pub enum ServicePartitionKind {
446    Int64Range,
447    Invalid,
448    Named,
449    Singleton,
450}
451
452impl From<&ServicePartitionKind> for FABRIC_SERVICE_PARTITION_KIND {
453    fn from(value: &ServicePartitionKind) -> Self {
454        match value {
455            ServicePartitionKind::Int64Range => FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
456            ServicePartitionKind::Invalid => FABRIC_SERVICE_PARTITION_KIND_INVALID,
457            ServicePartitionKind::Named => FABRIC_SERVICE_PARTITION_KIND_NAMED,
458            ServicePartitionKind::Singleton => FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
459        }
460    }
461}
462
463impl From<FABRIC_SERVICE_PARTITION_KIND> for ServicePartitionKind {
464    fn from(value: FABRIC_SERVICE_PARTITION_KIND) -> Self {
465        match value {
466            FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE => ServicePartitionKind::Int64Range,
467            FABRIC_SERVICE_PARTITION_KIND_INVALID => ServicePartitionKind::Invalid,
468            FABRIC_SERVICE_PARTITION_KIND_NAMED => ServicePartitionKind::Named,
469            FABRIC_SERVICE_PARTITION_KIND_SINGLETON => ServicePartitionKind::Singleton,
470            _ => {
471                if cfg!(debug_assertions) {
472                    panic!("unknown type: {value:?}");
473                } else {
474                    ServicePartitionKind::Invalid
475                }
476            }
477        }
478    }
479}
480
481#[derive(Debug, Clone)]
482pub struct ResolvedServicePartition {
483    com: IFabricResolvedServicePartitionResult,
484    pub service_name: Uri,
485    pub service_partition_kind: ServicePartitionKind,
486    pub partition_key_type: PartitionKeyType,
487    pub endpoints: Vec<ResolvedServiceEndpoint>,
488}
489
490impl From<IFabricResolvedServicePartitionResult> for ResolvedServicePartition {
491    fn from(com: IFabricResolvedServicePartitionResult) -> Self {
492        let raw = unsafe { com.get_Partition().as_ref().unwrap() };
493        let service_name = Uri::from(raw.ServiceName);
494        let kind_raw = raw.Info.Kind;
495        let val = raw.Info.Value;
496        let service_partition_kind: ServicePartitionKind = kind_raw.into();
497        let partition_key_type = PartitionKeyType::from_raw_svc_part(service_partition_kind, val);
498        let endpoints = crate::iter::vec_from_raw_com(raw.EndpointCount as usize, raw.Endpoints);
499        Self {
500            com,
501            service_name,
502            service_partition_kind,
503            partition_key_type,
504            endpoints,
505        }
506    }
507}
508
509impl ResolvedServicePartition {
510    // If compared with different partition error is returned.
511    // to enable the user to identify which RSP is more
512    // up-to-date. A returned value of 0 indicates that the two RSPs have the same version. 1 indicates that the other RSP has an older version.
513    // -1 indicates that the other RSP has a newer version.
514    pub fn compare_version(&self, other: &ResolvedServicePartition) -> crate::Result<i32> {
515        unsafe { self.com.CompareVersion(&other.com) }.map_err(crate::Error::from)
516    }
517}
518
519impl PartialEq for ResolvedServicePartition {
520    fn eq(&self, other: &Self) -> bool {
521        match self.compare_version(other) {
522            Ok(i) => i == 0,
523            Err(_) => false, // error comparing different services
524        }
525    }
526}
527
528impl PartialOrd for ResolvedServicePartition {
529    /// Compare the version of the resolved result.
530    /// a > b means partial_cmp(a,b) == Some(Greater) i.e. a.compare_version(b) > 0.
531    /// a is newer and up to date.
532    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
533        match self.compare_version(other) {
534            Ok(i) => Some(i.cmp(&0)),
535            // If you compare version of different service you get error
536            Err(_) => None,
537        }
538    }
539}
540
541#[derive(Debug, PartialEq, Eq, Clone, Copy)]
542pub enum ServiceEndpointRole {
543    Invalid,
544    StatefulPrimary,
545    StatefulPrimaryAuxiliary,
546    StatefulSecondary,
547    StatefulAuxiliary,
548    Stateless,
549}
550
551impl From<FABRIC_SERVICE_ENDPOINT_ROLE> for ServiceEndpointRole {
552    fn from(value: FABRIC_SERVICE_ENDPOINT_ROLE) -> Self {
553        match value {
554            FABRIC_SERVICE_ROLE_INVALID => ServiceEndpointRole::Invalid,
555            FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY => ServiceEndpointRole::StatefulPrimary,
556            FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY => {
557                ServiceEndpointRole::StatefulPrimaryAuxiliary
558            }
559            FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY => ServiceEndpointRole::StatefulSecondary,
560            FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY => ServiceEndpointRole::StatefulAuxiliary,
561            FABRIC_SERVICE_ROLE_STATELESS => ServiceEndpointRole::Stateless,
562            _ => {
563                if cfg!(debug_assertions) {
564                    panic!("unknown type: {value:?}");
565                } else {
566                    ServiceEndpointRole::Invalid
567                }
568            }
569        }
570    }
571}
572
573#[derive(Debug, Clone, PartialEq)]
574pub struct ResolvedServiceEndpoint {
575    pub address: WString,
576    pub role: ServiceEndpointRole,
577}
578
579impl From<&FABRIC_RESOLVED_SERVICE_ENDPOINT> for ResolvedServiceEndpoint {
580    fn from(value: &FABRIC_RESOLVED_SERVICE_ENDPOINT) -> Self {
581        let raw = value;
582        Self {
583            address: WString::from(raw.Address),
584            role: raw.Role.into(),
585        }
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use crate::{PCWSTR, WString};
592
593    use super::{PartitionKeyType, ServicePartitionKind};
594
595    #[test]
596    fn test_conversion_int() {
597        let k = PartitionKeyType::Int64(99);
598        // check the raw ptr is ok
599        let raw = k.get_raw_opt();
600        let i = unsafe { (raw.unwrap() as *const i64).as_ref().unwrap() };
601        assert_eq!(*i, 99);
602
603        let service_type = ServicePartitionKind::Int64Range;
604        // restore the key
605        let k2 = PartitionKeyType::from_raw_svc_part(service_type, raw.unwrap());
606        assert_eq!(k, k2);
607    }
608
609    #[test]
610    fn test_conversion_string() {
611        let src = WString::from("mystr");
612        let k = PartitionKeyType::String(src.clone());
613        // check the raw ptr is ok
614        let raw = k.get_raw_opt();
615        let s = WString::from(PCWSTR::from_raw(raw.unwrap() as *const u16));
616        assert_eq!(s, src);
617
618        let service_type = ServicePartitionKind::Named;
619        // restore the key
620        let k2 = PartitionKeyType::from_raw_svc_part(service_type, raw.unwrap());
621        assert_eq!(k, k2);
622    }
623}