mssf_core/client/
svc_mgmt_client.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{ffi::c_void, time::Duration};
7
8use crate::{PCWSTR, WString, runtime::executor::BoxedCancelToken, types::Uri};
9use mssf_com::{
10    FabricClient::{IFabricResolvedServicePartitionResult, IFabricServiceManagementClient6},
11    FabricTypes::{
12        FABRIC_PARTITION_KEY_TYPE, FABRIC_PARTITION_KEY_TYPE_INT64,
13        FABRIC_PARTITION_KEY_TYPE_INVALID, FABRIC_PARTITION_KEY_TYPE_NONE,
14        FABRIC_PARTITION_KEY_TYPE_STRING, FABRIC_REMOVE_REPLICA_DESCRIPTION,
15        FABRIC_RESOLVED_SERVICE_ENDPOINT, FABRIC_RESTART_REPLICA_DESCRIPTION,
16        FABRIC_SERVICE_DESCRIPTION, FABRIC_SERVICE_ENDPOINT_ROLE,
17        FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, FABRIC_SERVICE_PARTITION_KIND,
18        FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, FABRIC_SERVICE_PARTITION_KIND_INVALID,
19        FABRIC_SERVICE_PARTITION_KIND_NAMED, FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
20        FABRIC_SERVICE_ROLE_INVALID, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY,
21        FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY, FABRIC_SERVICE_ROLE_STATELESS,
22        FABRIC_SERVICE_UPDATE_DESCRIPTION, FABRIC_URI,
23    },
24};
25
26use crate::sync::{FabricReceiver, fabric_begin_end_proxy};
27
28use crate::types::{
29    RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription,
30};
31
32// Service Management Client
33#[derive(Debug, Clone)]
34pub struct ServiceManagementClient {
35    com: IFabricServiceManagementClient6,
36}
37
38impl ServiceManagementClient {
39    pub fn get_com(&self) -> IFabricServiceManagementClient6 {
40        self.com.clone()
41    }
42}
43// internal implementation block
44
45impl ServiceManagementClient {
46    fn resolve_service_partition_internal(
47        &self,
48        name: FABRIC_URI,
49        partition_key_type: FABRIC_PARTITION_KEY_TYPE,
50        partition_key: Option<*const ::core::ffi::c_void>,
51        previous_result: Option<&IFabricResolvedServicePartitionResult>, // This is different from generated code
52        timeout_milliseconds: u32,
53        cancellation_token: Option<BoxedCancelToken>,
54    ) -> FabricReceiver<crate::WinResult<IFabricResolvedServicePartitionResult>> {
55        let com1 = &self.com;
56        let com2 = self.com.clone();
57        fabric_begin_end_proxy(
58            move |callback| unsafe {
59                com1.BeginResolveServicePartition(
60                    name,
61                    partition_key_type,
62                    partition_key.unwrap_or(std::ptr::null()),
63                    previous_result,
64                    timeout_milliseconds,
65                    callback,
66                )
67            },
68            move |ctx| unsafe { com2.EndResolveServicePartition(ctx) },
69            cancellation_token,
70        )
71    }
72
73    fn restart_replica_internal(
74        &self,
75        desc: &FABRIC_RESTART_REPLICA_DESCRIPTION,
76        timeout_milliseconds: u32,
77        cancellation_token: Option<BoxedCancelToken>,
78    ) -> FabricReceiver<crate::WinResult<()>> {
79        let com1 = &self.com;
80        let com2 = self.com.clone();
81        fabric_begin_end_proxy(
82            move |callback| unsafe {
83                com1.BeginRestartReplica(desc, timeout_milliseconds, callback)
84            },
85            move |ctx| unsafe { com2.EndRestartReplica(ctx) },
86            cancellation_token,
87        )
88    }
89
90    fn remove_replica_internal(
91        &self,
92        desc: &FABRIC_REMOVE_REPLICA_DESCRIPTION,
93        timeout_milliseconds: u32,
94        cancellation_token: Option<BoxedCancelToken>,
95    ) -> FabricReceiver<crate::WinResult<()>> {
96        let com1 = &self.com;
97        let com2 = self.com.clone();
98        fabric_begin_end_proxy(
99            move |callback| unsafe {
100                com1.BeginRemoveReplica(desc, timeout_milliseconds, callback)
101            },
102            move |ctx| unsafe { com2.EndRemoveReplica(ctx) },
103            cancellation_token,
104        )
105    }
106
107    fn register_service_notification_filter_internal(
108        &self,
109        desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
110        timeout_milliseconds: u32,
111        cancellation_token: Option<BoxedCancelToken>,
112    ) -> FabricReceiver<crate::WinResult<i64>> {
113        let com1 = &self.com;
114        let com2 = self.com.clone();
115        fabric_begin_end_proxy(
116            move |callback| unsafe {
117                com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback)
118            },
119            move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) },
120            cancellation_token,
121        )
122    }
123
124    fn unregister_service_notification_filter_internal(
125        &self,
126        filterid: i64,
127        timeout_milliseconds: u32,
128        cancellation_token: Option<BoxedCancelToken>,
129    ) -> FabricReceiver<crate::WinResult<()>> {
130        let com1 = &self.com;
131        let com2 = self.com.clone();
132        fabric_begin_end_proxy(
133            move |callback| unsafe {
134                com1.BeginUnregisterServiceNotificationFilter(
135                    filterid,
136                    timeout_milliseconds,
137                    callback,
138                )
139            },
140            move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) },
141            cancellation_token,
142        )
143    }
144
145    fn create_service_internal(
146        &self,
147        desc: &FABRIC_SERVICE_DESCRIPTION,
148        timeout_milliseconds: u32,
149        cancellation_token: Option<BoxedCancelToken>,
150    ) -> FabricReceiver<crate::WinResult<()>> {
151        let com1 = &self.com;
152        let com2 = self.com.clone();
153        fabric_begin_end_proxy(
154            move |callback| unsafe {
155                com1.BeginCreateService(desc, timeout_milliseconds, callback)
156            },
157            move |ctx| unsafe { com2.EndCreateService(ctx) },
158            cancellation_token,
159        )
160    }
161
162    fn update_service_internal(
163        &self,
164        name: FABRIC_URI,
165        desc: &FABRIC_SERVICE_UPDATE_DESCRIPTION,
166        timeout_milliseconds: u32,
167        cancellation_token: Option<BoxedCancelToken>,
168    ) -> FabricReceiver<crate::WinResult<()>> {
169        let com1 = &self.com;
170        let com2 = self.com.clone();
171        fabric_begin_end_proxy(
172            move |callback| unsafe {
173                com1.BeginUpdateService(name, desc, timeout_milliseconds, callback)
174            },
175            move |ctx| unsafe { com2.EndUpdateService(ctx) },
176            cancellation_token,
177        )
178    }
179
180    fn delete_service_internal(
181        &self,
182        name: FABRIC_URI,
183        timeout_milliseconds: u32,
184        cancellation_token: Option<BoxedCancelToken>,
185    ) -> FabricReceiver<crate::WinResult<()>> {
186        let com1 = &self.com;
187        let com2 = self.com.clone();
188        fabric_begin_end_proxy(
189            move |callback| unsafe {
190                com1.BeginDeleteService(name, timeout_milliseconds, callback)
191            },
192            move |ctx| unsafe { com2.EndDeleteService(ctx) },
193            cancellation_token,
194        )
195    }
196}
197
198impl From<IFabricServiceManagementClient6> for ServiceManagementClient {
199    fn from(com: IFabricServiceManagementClient6) -> Self {
200        Self { com }
201    }
202}
203
204impl From<ServiceManagementClient> for IFabricServiceManagementClient6 {
205    fn from(value: ServiceManagementClient) -> Self {
206        value.com
207    }
208}
209
210// public implementation block - tokio required
211
212impl ServiceManagementClient {
213    // Resolve service partition
214    pub async fn resolve_service_partition(
215        &self,
216        name: &Uri,
217        key_type: &PartitionKeyType,
218        prev: Option<&ResolvedServicePartition>,
219        timeout: Duration,
220        cancellation_token: Option<BoxedCancelToken>,
221    ) -> crate::Result<ResolvedServicePartition> {
222        let com = {
223            let uri = name.as_raw();
224            // supply prev as null if not present
225            let prev_opt = prev.map(|x| &x.com);
226
227            let part_key_opt = key_type.get_raw_opt();
228
229            self.resolve_service_partition_internal(
230                uri,
231                key_type.into(),
232                part_key_opt,
233                prev_opt,
234                timeout.as_millis().try_into().unwrap(),
235                cancellation_token,
236            )
237        }
238        .await??;
239        let res = ResolvedServicePartition::from(com);
240        Ok(res)
241    }
242
243    /// Simulates a service replica failure by restarting a persisted service replica,
244    /// closing the replica, and then reopening it. Use this to test your service for problems
245    /// along the replica reopen path. This helps simulate the report fault temporary path through client APIs.
246    /// This is only valid for replicas that belong to stateful persisted services.
247    pub async fn restart_replica(
248        &self,
249        desc: &RestartReplicaDescription,
250        timeout: Duration,
251        cancellation_token: Option<BoxedCancelToken>,
252    ) -> crate::Result<()> {
253        {
254            let raw: FABRIC_RESTART_REPLICA_DESCRIPTION = desc.into();
255            self.restart_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
256        }
257        .await?
258        .map_err(crate::Error::from)
259    }
260
261    /// This API gives a running replica the chance to cleanup its state and be gracefully shutdown.
262    /// WARNING: There are no safety checks performed when this API is used.
263    /// Incorrect use of this API can lead to data loss for stateful services.
264    /// Remarks:
265    /// For stateless services, Instance Abort is called.
266    pub async fn remove_replica(
267        &self,
268        desc: &RemoveReplicaDescription,
269        timeout: Duration,
270        cancellation_token: Option<BoxedCancelToken>,
271    ) -> crate::Result<()> {
272        {
273            let raw: FABRIC_REMOVE_REPLICA_DESCRIPTION = desc.into();
274            self.remove_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
275        }
276        .await?
277        .map_err(crate::Error::from)
278    }
279
280    /// Remarks:
281    /// There is a cache of service endpoints in the client that gets updated by notifications
282    /// and this same cache is used to satisfy complaint based resolution requests
283    /// (see resolve_service_partition())). Applications that both register for notifications
284    /// and use complaint based resolution on the same client instance typically only need to
285    /// pass null for the ResolvedServicePartition argument during resolution.
286    /// This will always return the endpoints in the client cache updated by the latest notification.
287    /// The notification mechanism itself will keep the client cache updated when service endpoints change.
288    ///
289    /// Notification callback is delivered on `FabricClientBuilder::with_on_service_notification` as well.
290    /// The callback contains minimum info only as a signal, user can call resolve_service_partition()
291    /// again to retrieve full info from the cache.
292    ///
293    /// This is observed to have 1~4 secs delay compared with brute force complaint based resolve.
294    pub async fn register_service_notification_filter(
295        &self,
296        desc: &ServiceNotificationFilterDescription,
297        timeout: Duration,
298        cancellation_token: Option<BoxedCancelToken>,
299    ) -> crate::Result<FilterIdHandle> {
300        let id = {
301            let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into();
302            self.register_service_notification_filter_internal(
303                &raw,
304                timeout.as_millis() as u32,
305                cancellation_token,
306            )
307        }
308        .await??;
309        Ok(FilterIdHandle { id })
310    }
311
312    /// It's not necessary to unregister individual filters if the client itself
313    /// will no longer be used since all ServiceNotificationFilterDescription
314    /// objects registered by the FabricClient will be automatically unregistered when client is disposed.
315    pub async fn unregister_service_notification_filter(
316        &self,
317        filter_id_handle: FilterIdHandle,
318        timeout: Duration,
319        cancellation_token: Option<BoxedCancelToken>,
320    ) -> crate::Result<()> {
321        self.unregister_service_notification_filter_internal(
322            filter_id_handle.id,
323            timeout.as_millis() as u32,
324            cancellation_token,
325        )
326        .await?
327        .map_err(crate::Error::from)
328    }
329
330    pub async fn create_service(
331        &self,
332        desc: &crate::types::ServiceDescription,
333        timeout: Duration,
334        cancellation_token: Option<BoxedCancelToken>,
335    ) -> crate::Result<()> {
336        {
337            let desc_raw = desc.build_raw();
338            let ffi_raw = desc_raw.as_ffi();
339            self.create_service_internal(&ffi_raw, timeout.as_millis() as u32, cancellation_token)
340        }
341        .await?
342        .map_err(crate::Error::from)
343    }
344
345    pub async fn update_service(
346        &self,
347        name: &Uri,
348        desc: &crate::types::ServiceUpdateDescription,
349        timeout: Duration,
350        cancellation_token: Option<BoxedCancelToken>,
351    ) -> crate::Result<()> {
352        {
353            let desc_raw = desc.build_raw();
354            let ffi_raw = desc_raw.as_ffi();
355            self.update_service_internal(
356                name.as_raw(),
357                &ffi_raw,
358                timeout.as_millis() as u32,
359                cancellation_token,
360            )
361        }
362        .await?
363        .map_err(crate::Error::from)
364    }
365
366    pub async fn delete_service(
367        &self,
368        name: &Uri,
369        timeout: Duration,
370        cancellation_token: Option<BoxedCancelToken>,
371    ) -> crate::Result<()> {
372        self.delete_service_internal(
373            name.as_raw(),
374            timeout.as_millis() as u32,
375            cancellation_token,
376        )
377        .await?
378        .map_err(crate::Error::from)
379    }
380}
381
382// Handle to the registered service notification filter
383
384#[derive(Debug, PartialEq)]
385pub struct FilterIdHandle {
386    pub(crate) id: i64,
387}
388
389// see ComFabricClient.cpp for conversion details in cpp
390#[derive(Debug, PartialEq, Clone)]
391pub enum PartitionKeyType {
392    Int64(i64),
393    Invalid,
394    None,
395    String(WString),
396}
397
398impl PartitionKeyType {
399    fn from_raw_svc_part(svc: ServicePartitionKind, data: *const c_void) -> PartitionKeyType {
400        match svc {
401            ServicePartitionKind::Int64Range => {
402                let x = unsafe { (data as *mut i64).as_ref().unwrap() };
403                PartitionKeyType::Int64(*x)
404            }
405            ServicePartitionKind::Invalid => PartitionKeyType::Invalid,
406            ServicePartitionKind::Singleton => PartitionKeyType::None,
407            ServicePartitionKind::Named => {
408                let x = data as *mut u16;
409                assert!(!x.is_null());
410                let s = WString::from(PCWSTR::from_raw(x));
411                PartitionKeyType::String(s)
412            }
413        }
414    }
415}
416
417impl From<&PartitionKeyType> for FABRIC_PARTITION_KEY_TYPE {
418    fn from(value: &PartitionKeyType) -> Self {
419        match value {
420            PartitionKeyType::Int64(_) => FABRIC_PARTITION_KEY_TYPE_INT64,
421            PartitionKeyType::Invalid => FABRIC_PARTITION_KEY_TYPE_INVALID,
422            PartitionKeyType::None => FABRIC_PARTITION_KEY_TYPE_NONE,
423            PartitionKeyType::String(_) => FABRIC_PARTITION_KEY_TYPE_STRING,
424        }
425    }
426}
427
428impl PartitionKeyType {
429    // get raw ptr to pass in com api
430    fn get_raw_opt(&self) -> Option<*const c_void> {
431        match self {
432            // Not sure if this is ok for i64
433            PartitionKeyType::Int64(x) => Some(x as *const i64 as *const c_void),
434            PartitionKeyType::Invalid => None,
435            PartitionKeyType::None => None,
436            PartitionKeyType::String(x) => {
437                Some(PCWSTR::from_raw(x.as_ptr()).as_ptr() as *const c_void)
438            }
439        }
440    }
441}
442
443#[derive(Clone, Copy, Debug, PartialEq)]
444pub enum ServicePartitionKind {
445    Int64Range,
446    Invalid,
447    Named,
448    Singleton,
449}
450
451impl From<&ServicePartitionKind> for FABRIC_SERVICE_PARTITION_KIND {
452    fn from(value: &ServicePartitionKind) -> Self {
453        match value {
454            ServicePartitionKind::Int64Range => FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
455            ServicePartitionKind::Invalid => FABRIC_SERVICE_PARTITION_KIND_INVALID,
456            ServicePartitionKind::Named => FABRIC_SERVICE_PARTITION_KIND_NAMED,
457            ServicePartitionKind::Singleton => FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
458        }
459    }
460}
461
462impl From<FABRIC_SERVICE_PARTITION_KIND> for ServicePartitionKind {
463    fn from(value: FABRIC_SERVICE_PARTITION_KIND) -> Self {
464        match value {
465            FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE => ServicePartitionKind::Int64Range,
466            FABRIC_SERVICE_PARTITION_KIND_INVALID => ServicePartitionKind::Invalid,
467            FABRIC_SERVICE_PARTITION_KIND_NAMED => ServicePartitionKind::Named,
468            FABRIC_SERVICE_PARTITION_KIND_SINGLETON => ServicePartitionKind::Singleton,
469            _ => {
470                if cfg!(debug_assertions) {
471                    panic!("unknown type: {value:?}");
472                } else {
473                    ServicePartitionKind::Invalid
474                }
475            }
476        }
477    }
478}
479
480#[derive(Debug, Clone)]
481pub struct ResolvedServicePartition {
482    com: IFabricResolvedServicePartitionResult,
483    pub service_name: Uri,
484    pub service_partition_kind: ServicePartitionKind,
485    pub partition_key_type: PartitionKeyType,
486    pub endpoints: Vec<ResolvedServiceEndpoint>,
487}
488
489impl From<IFabricResolvedServicePartitionResult> for ResolvedServicePartition {
490    fn from(com: IFabricResolvedServicePartitionResult) -> Self {
491        let raw = unsafe { com.get_Partition().as_ref().unwrap() };
492        let service_name = Uri::from(raw.ServiceName);
493        let kind_raw = raw.Info.Kind;
494        let val = raw.Info.Value;
495        let service_partition_kind: ServicePartitionKind = kind_raw.into();
496        let partition_key_type = PartitionKeyType::from_raw_svc_part(service_partition_kind, val);
497        let endpoints = crate::iter::vec_from_raw_com(raw.EndpointCount as usize, raw.Endpoints);
498        Self {
499            com,
500            service_name,
501            service_partition_kind,
502            partition_key_type,
503            endpoints,
504        }
505    }
506}
507
508impl ResolvedServicePartition {
509    // If compared with different partition error is returned.
510    // to enable the user to identify which RSP is more
511    // up-to-date. A returned value of 0 indicates that the two RSPs have the same version. 1 indicates that the other RSP has an older version.
512    // -1 indicates that the other RSP has a newer version.
513    pub fn compare_version(&self, other: &ResolvedServicePartition) -> crate::Result<i32> {
514        unsafe { self.com.CompareVersion(&other.com) }.map_err(crate::Error::from)
515    }
516}
517
518impl PartialEq for ResolvedServicePartition {
519    fn eq(&self, other: &Self) -> bool {
520        match self.compare_version(other) {
521            Ok(i) => i == 0,
522            Err(_) => false, // error comparing different services
523        }
524    }
525}
526
527impl PartialOrd for ResolvedServicePartition {
528    /// Compare the version of the resolved result.
529    /// a > b means partial_cmp(a,b) == Some(Greater) i.e. a.compare_version(b) > 0.
530    /// a is newer and up to date.
531    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
532        match self.compare_version(other) {
533            Ok(i) => Some(i.cmp(&0)),
534            // If you compare version of different service you get error
535            Err(_) => None,
536        }
537    }
538}
539
540#[derive(Debug, PartialEq, Clone)]
541pub enum ServiceEndpointRole {
542    Invalid,
543    StatefulPrimary,
544    StatefulSecondary,
545    Stateless,
546}
547
548impl From<FABRIC_SERVICE_ENDPOINT_ROLE> for ServiceEndpointRole {
549    fn from(value: FABRIC_SERVICE_ENDPOINT_ROLE) -> Self {
550        match value {
551            FABRIC_SERVICE_ROLE_INVALID => ServiceEndpointRole::Invalid,
552            FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY => ServiceEndpointRole::StatefulPrimary,
553            FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY => ServiceEndpointRole::StatefulSecondary,
554            FABRIC_SERVICE_ROLE_STATELESS => ServiceEndpointRole::Stateless,
555            _ => {
556                if cfg!(debug_assertions) {
557                    panic!("unknown type: {value:?}");
558                } else {
559                    ServiceEndpointRole::Invalid
560                }
561            }
562        }
563    }
564}
565
566#[derive(Debug, Clone, PartialEq)]
567pub struct ResolvedServiceEndpoint {
568    pub address: WString,
569    pub role: ServiceEndpointRole,
570}
571
572impl From<&FABRIC_RESOLVED_SERVICE_ENDPOINT> for ResolvedServiceEndpoint {
573    fn from(value: &FABRIC_RESOLVED_SERVICE_ENDPOINT) -> Self {
574        let raw = value;
575        Self {
576            address: WString::from(raw.Address),
577            role: raw.Role.into(),
578        }
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use crate::{PCWSTR, WString};
585
586    use super::{PartitionKeyType, ServicePartitionKind};
587
588    #[test]
589    fn test_conversion_int() {
590        let k = PartitionKeyType::Int64(99);
591        // check the raw ptr is ok
592        let raw = k.get_raw_opt();
593        let i = unsafe { (raw.unwrap() as *const i64).as_ref().unwrap() };
594        assert_eq!(*i, 99);
595
596        let service_type = ServicePartitionKind::Int64Range;
597        // restore the key
598        let k2 = PartitionKeyType::from_raw_svc_part(service_type, raw.unwrap());
599        assert_eq!(k, k2);
600    }
601
602    #[test]
603    fn test_conversion_string() {
604        let src = WString::from("mystr");
605        let k = PartitionKeyType::String(src.clone());
606        // check the raw ptr is ok
607        let raw = k.get_raw_opt();
608        let s = WString::from(PCWSTR::from_raw(raw.unwrap() as *const u16));
609        assert_eq!(s, src);
610
611        let service_type = ServicePartitionKind::Named;
612        // restore the key
613        let k2 = PartitionKeyType::from_raw_svc_part(service_type, raw.unwrap());
614        assert_eq!(k, k2);
615    }
616}