Skip to main content

mssf_core/client/
svc_mgmt_client.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{ffi::c_void, time::Duration};
7
8use crate::{
9    PCWSTR, WString,
10    mem::{BoxPool, GetRawWithBoxPool},
11    runtime::executor::BoxedCancelToken,
12    types::Uri,
13};
14use mssf_com::{
15    FabricClient::{IFabricResolvedServicePartitionResult, IFabricServiceManagementClient8},
16    FabricTypes::{
17        FABRIC_PARTITION_KEY_TYPE, FABRIC_PARTITION_KEY_TYPE_INT64,
18        FABRIC_PARTITION_KEY_TYPE_INVALID, FABRIC_PARTITION_KEY_TYPE_NONE,
19        FABRIC_PARTITION_KEY_TYPE_STRING, FABRIC_REMOVE_REPLICA_DESCRIPTION,
20        FABRIC_RESOLVED_SERVICE_ENDPOINT, FABRIC_RESTART_REPLICA_DESCRIPTION,
21        FABRIC_SERVICE_DESCRIPTION, FABRIC_SERVICE_ENDPOINT_ROLE,
22        FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, FABRIC_SERVICE_PARTITION_KIND,
23        FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, FABRIC_SERVICE_PARTITION_KIND_INVALID,
24        FABRIC_SERVICE_PARTITION_KIND_NAMED, FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
25        FABRIC_SERVICE_ROLE_INVALID, FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY,
26        FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY,
27        FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY, FABRIC_SERVICE_ROLE_STATELESS,
28        FABRIC_SERVICE_UPDATE_DESCRIPTION, FABRIC_URI,
29    },
30};
31
32use crate::sync::{FabricReceiver, fabric_begin_end_proxy};
33
34use crate::types::{
35    RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription,
36};
37
38// Service Management Client
39#[derive(Debug, Clone)]
40pub struct ServiceManagementClient {
41    com: IFabricServiceManagementClient8,
42}
43
44impl ServiceManagementClient {
45    pub fn get_com(&self) -> IFabricServiceManagementClient8 {
46        self.com.clone()
47    }
48}
49// internal implementation block
50
51impl ServiceManagementClient {
52    fn resolve_service_partition_internal(
53        &self,
54        name: FABRIC_URI,
55        partition_key_type: FABRIC_PARTITION_KEY_TYPE,
56        partition_key: *const ::core::ffi::c_void,
57        previous_result: Option<&IFabricResolvedServicePartitionResult>, // This is different from generated code
58        timeout_milliseconds: u32,
59        cancellation_token: Option<BoxedCancelToken>,
60    ) -> FabricReceiver<crate::Result<IFabricResolvedServicePartitionResult>> {
61        let com1 = &self.com;
62        let com2 = self.com.clone();
63        fabric_begin_end_proxy(
64            move |callback| unsafe {
65                com1.BeginResolveServicePartition(
66                    name,
67                    partition_key_type,
68                    partition_key,
69                    previous_result,
70                    timeout_milliseconds,
71                    callback,
72                )
73            },
74            move |ctx| unsafe { com2.EndResolveServicePartition(ctx) },
75            cancellation_token,
76        )
77    }
78
79    fn restart_replica_internal(
80        &self,
81        desc: &FABRIC_RESTART_REPLICA_DESCRIPTION,
82        timeout_milliseconds: u32,
83        cancellation_token: Option<BoxedCancelToken>,
84    ) -> FabricReceiver<crate::Result<()>> {
85        let com1 = &self.com;
86        let com2 = self.com.clone();
87        fabric_begin_end_proxy(
88            move |callback| unsafe {
89                com1.BeginRestartReplica(desc, timeout_milliseconds, callback)
90            },
91            move |ctx| unsafe { com2.EndRestartReplica(ctx) },
92            cancellation_token,
93        )
94    }
95
96    fn remove_replica_internal(
97        &self,
98        desc: &FABRIC_REMOVE_REPLICA_DESCRIPTION,
99        timeout_milliseconds: u32,
100        cancellation_token: Option<BoxedCancelToken>,
101    ) -> FabricReceiver<crate::Result<()>> {
102        let com1 = &self.com;
103        let com2 = self.com.clone();
104        fabric_begin_end_proxy(
105            move |callback| unsafe {
106                com1.BeginRemoveReplica(desc, timeout_milliseconds, callback)
107            },
108            move |ctx| unsafe { com2.EndRemoveReplica(ctx) },
109            cancellation_token,
110        )
111    }
112
113    fn register_service_notification_filter_internal(
114        &self,
115        desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
116        timeout_milliseconds: u32,
117        cancellation_token: Option<BoxedCancelToken>,
118    ) -> FabricReceiver<crate::Result<i64>> {
119        let com1 = &self.com;
120        let com2 = self.com.clone();
121        fabric_begin_end_proxy(
122            move |callback| unsafe {
123                com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback)
124            },
125            move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) },
126            cancellation_token,
127        )
128    }
129
130    fn unregister_service_notification_filter_internal(
131        &self,
132        filterid: i64,
133        timeout_milliseconds: u32,
134        cancellation_token: Option<BoxedCancelToken>,
135    ) -> FabricReceiver<crate::Result<()>> {
136        let com1 = &self.com;
137        let com2 = self.com.clone();
138        fabric_begin_end_proxy(
139            move |callback| unsafe {
140                com1.BeginUnregisterServiceNotificationFilter(
141                    filterid,
142                    timeout_milliseconds,
143                    callback,
144                )
145            },
146            move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) },
147            cancellation_token,
148        )
149    }
150
151    fn create_service_internal(
152        &self,
153        desc: &FABRIC_SERVICE_DESCRIPTION,
154        timeout_milliseconds: u32,
155        cancellation_token: Option<BoxedCancelToken>,
156    ) -> FabricReceiver<crate::Result<()>> {
157        let com1 = &self.com;
158        let com2 = self.com.clone();
159        fabric_begin_end_proxy(
160            move |callback| unsafe {
161                com1.BeginCreateService(desc, timeout_milliseconds, callback)
162            },
163            move |ctx| unsafe { com2.EndCreateService(ctx) },
164            cancellation_token,
165        )
166    }
167
168    fn update_service_internal(
169        &self,
170        name: FABRIC_URI,
171        desc: &FABRIC_SERVICE_UPDATE_DESCRIPTION,
172        timeout_milliseconds: u32,
173        cancellation_token: Option<BoxedCancelToken>,
174    ) -> FabricReceiver<crate::Result<()>> {
175        let com1 = &self.com;
176        let com2 = self.com.clone();
177        fabric_begin_end_proxy(
178            move |callback| unsafe {
179                com1.BeginUpdateService(name, desc, timeout_milliseconds, callback)
180            },
181            move |ctx| unsafe { com2.EndUpdateService(ctx) },
182            cancellation_token,
183        )
184    }
185
186    fn delete_service_internal(
187        &self,
188        name: FABRIC_URI,
189        timeout_milliseconds: u32,
190        cancellation_token: Option<BoxedCancelToken>,
191    ) -> FabricReceiver<crate::Result<()>> {
192        let com1 = &self.com;
193        let com2 = self.com.clone();
194        fabric_begin_end_proxy(
195            move |callback| unsafe {
196                com1.BeginDeleteService(name, timeout_milliseconds, callback)
197            },
198            move |ctx| unsafe { com2.EndDeleteService(ctx) },
199            cancellation_token,
200        )
201    }
202}
203
204impl From<IFabricServiceManagementClient8> for ServiceManagementClient {
205    fn from(com: IFabricServiceManagementClient8) -> Self {
206        Self { com }
207    }
208}
209
210impl From<ServiceManagementClient> for IFabricServiceManagementClient8 {
211    fn from(value: ServiceManagementClient) -> Self {
212        value.com
213    }
214}
215
216// public implementation block - tokio required
217
218impl ServiceManagementClient {
219    // Resolve service partition
220    pub async fn resolve_service_partition(
221        &self,
222        name: &Uri,
223        key_type: &PartitionKeyType,
224        prev: Option<&ResolvedServicePartition>,
225        timeout: Duration,
226        cancellation_token: Option<BoxedCancelToken>,
227    ) -> crate::Result<ResolvedServicePartition> {
228        let com = {
229            let uri = name.as_raw();
230            // supply prev as null if not present
231            let prev_opt = prev.map(|x| &x.com);
232
233            let (key_type, key) = key_type.as_raw_parts();
234
235            self.resolve_service_partition_internal(
236                uri,
237                key_type,
238                key,
239                prev_opt,
240                timeout.as_millis().try_into().unwrap(),
241                cancellation_token,
242            )
243        }
244        .await??;
245        let res = ResolvedServicePartition::from(com);
246        Ok(res)
247    }
248
249    /// Simulates a service replica failure by restarting a persisted service replica,
250    /// closing the replica, and then reopening it. Use this to test your service for problems
251    /// along the replica reopen path. This helps simulate the report fault temporary path through client APIs.
252    /// This is only valid for replicas that belong to stateful persisted services.
253    pub async fn restart_replica(
254        &self,
255        desc: &RestartReplicaDescription,
256        timeout: Duration,
257        cancellation_token: Option<BoxedCancelToken>,
258    ) -> crate::Result<()> {
259        {
260            let raw: FABRIC_RESTART_REPLICA_DESCRIPTION = desc.into();
261            self.restart_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
262        }
263        .await?
264    }
265
266    /// This API gives a running replica the chance to cleanup its state and be gracefully shutdown.
267    /// WARNING: There are no safety checks performed when this API is used.
268    /// Incorrect use of this API can lead to data loss for stateful services.
269    /// Remarks:
270    /// For stateless services, Instance Abort is called.
271    pub async fn remove_replica(
272        &self,
273        desc: &RemoveReplicaDescription,
274        timeout: Duration,
275        cancellation_token: Option<BoxedCancelToken>,
276    ) -> crate::Result<()> {
277        {
278            let raw: FABRIC_REMOVE_REPLICA_DESCRIPTION = desc.into();
279            self.remove_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
280        }
281        .await?
282    }
283
284    /// Remarks:
285    /// There is a cache of service endpoints in the client that gets updated by notifications
286    /// and this same cache is used to satisfy complaint based resolution requests
287    /// (see resolve_service_partition())). Applications that both register for notifications
288    /// and use complaint based resolution on the same client instance typically only need to
289    /// pass null for the ResolvedServicePartition argument during resolution.
290    /// This will always return the endpoints in the client cache updated by the latest notification.
291    /// The notification mechanism itself will keep the client cache updated when service endpoints change.
292    ///
293    /// Notification callback is delivered on `FabricClientBuilder::with_on_service_notification` as well.
294    /// The callback contains minimum info only as a signal, user can call resolve_service_partition()
295    /// again to retrieve full info from the cache.
296    ///
297    /// This is observed to have 1~4 secs delay compared with brute force complaint based resolve.
298    pub async fn register_service_notification_filter(
299        &self,
300        desc: &ServiceNotificationFilterDescription,
301        timeout: Duration,
302        cancellation_token: Option<BoxedCancelToken>,
303    ) -> crate::Result<FilterIdHandle> {
304        let id = {
305            let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into();
306            self.register_service_notification_filter_internal(
307                &raw,
308                timeout.as_millis() as u32,
309                cancellation_token,
310            )
311        }
312        .await??;
313        Ok(FilterIdHandle { id })
314    }
315
316    /// It's not necessary to unregister individual filters if the client itself
317    /// will no longer be used since all ServiceNotificationFilterDescription
318    /// objects registered by the FabricClient will be automatically unregistered when client is disposed.
319    pub async fn unregister_service_notification_filter(
320        &self,
321        filter_id_handle: FilterIdHandle,
322        timeout: Duration,
323        cancellation_token: Option<BoxedCancelToken>,
324    ) -> crate::Result<()> {
325        self.unregister_service_notification_filter_internal(
326            filter_id_handle.id,
327            timeout.as_millis() as u32,
328            cancellation_token,
329        )
330        .await?
331    }
332
333    pub async fn create_service(
334        &self,
335        desc: &crate::types::ServiceDescription,
336        timeout: Duration,
337        cancellation_token: Option<BoxedCancelToken>,
338    ) -> crate::Result<()> {
339        {
340            let mut pool = BoxPool::new();
341            let ffi_raw = desc.get_raw_with_pool(&mut pool);
342            self.create_service_internal(&ffi_raw, timeout.as_millis() as u32, cancellation_token)
343        }
344        .await?
345    }
346
347    pub async fn update_service(
348        &self,
349        name: &Uri,
350        desc: &crate::types::ServiceUpdateDescription,
351        timeout: Duration,
352        cancellation_token: Option<BoxedCancelToken>,
353    ) -> crate::Result<()> {
354        {
355            let mut pool = BoxPool::new();
356            let ffi_raw = desc.get_raw_with_pool(&mut pool);
357            self.update_service_internal(
358                name.as_raw(),
359                &ffi_raw,
360                timeout.as_millis() as u32,
361                cancellation_token,
362            )
363        }
364        .await?
365    }
366
367    pub async fn delete_service(
368        &self,
369        name: &Uri,
370        timeout: Duration,
371        cancellation_token: Option<BoxedCancelToken>,
372    ) -> crate::Result<()> {
373        self.delete_service_internal(
374            name.as_raw(),
375            timeout.as_millis() as u32,
376            cancellation_token,
377        )
378        .await?
379    }
380}
381
382// Handle to the registered service notification filter
383
384#[derive(Debug, PartialEq)]
385pub struct FilterIdHandle {
386    pub(crate) id: i64,
387}
388
389// see ComFabricClient.cpp for conversion details in cpp
390#[derive(Debug, PartialEq, Clone)]
391pub enum PartitionKeyType {
392    Int64(i64),
393    Invalid,
394    None,
395    String(WString),
396}
397
398impl PartitionKeyType {
399    /// # Safety
400    /// Data must be valid for the given partition kind.
401    unsafe fn from_raw_parts(svc: ServicePartitionKind, data: *const c_void) -> PartitionKeyType {
402        match svc {
403            ServicePartitionKind::Int64Range => {
404                let x = unsafe { (data as *mut i64).as_ref().unwrap() };
405                PartitionKeyType::Int64(*x)
406            }
407            ServicePartitionKind::Invalid => PartitionKeyType::Invalid,
408            ServicePartitionKind::Singleton => PartitionKeyType::None,
409            ServicePartitionKind::Named => {
410                let x = data as *mut u16;
411                assert!(!x.is_null());
412                let s = WString::from(PCWSTR::from_raw(x));
413                PartitionKeyType::String(s)
414            }
415        }
416    }
417}
418
419impl PartitionKeyType {
420    // Get raw parts to pass in com api
421    fn as_raw_parts(&self) -> (FABRIC_PARTITION_KEY_TYPE, *const c_void) {
422        match self {
423            // Not sure if this is ok for i64
424            PartitionKeyType::Int64(x) => (
425                FABRIC_PARTITION_KEY_TYPE_INT64,
426                x as *const i64 as *const c_void,
427            ),
428            PartitionKeyType::Invalid => (FABRIC_PARTITION_KEY_TYPE_INVALID, std::ptr::null()),
429            PartitionKeyType::None => (FABRIC_PARTITION_KEY_TYPE_NONE, std::ptr::null()),
430            PartitionKeyType::String(x) => (
431                FABRIC_PARTITION_KEY_TYPE_STRING,
432                x.as_pcwstr().as_ptr() as *const c_void,
433            ),
434        }
435    }
436}
437
438#[derive(Clone, Copy, Debug, PartialEq)]
439pub enum ServicePartitionKind {
440    Int64Range,
441    Invalid,
442    Named,
443    Singleton,
444}
445
446impl From<&ServicePartitionKind> for FABRIC_SERVICE_PARTITION_KIND {
447    fn from(value: &ServicePartitionKind) -> Self {
448        match value {
449            ServicePartitionKind::Int64Range => FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
450            ServicePartitionKind::Invalid => FABRIC_SERVICE_PARTITION_KIND_INVALID,
451            ServicePartitionKind::Named => FABRIC_SERVICE_PARTITION_KIND_NAMED,
452            ServicePartitionKind::Singleton => FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
453        }
454    }
455}
456
457impl From<FABRIC_SERVICE_PARTITION_KIND> for ServicePartitionKind {
458    fn from(value: FABRIC_SERVICE_PARTITION_KIND) -> Self {
459        match value {
460            FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE => ServicePartitionKind::Int64Range,
461            FABRIC_SERVICE_PARTITION_KIND_INVALID => ServicePartitionKind::Invalid,
462            FABRIC_SERVICE_PARTITION_KIND_NAMED => ServicePartitionKind::Named,
463            FABRIC_SERVICE_PARTITION_KIND_SINGLETON => ServicePartitionKind::Singleton,
464            _ => {
465                if cfg!(debug_assertions) {
466                    panic!("unknown type: {value:?}");
467                } else {
468                    ServicePartitionKind::Invalid
469                }
470            }
471        }
472    }
473}
474
475#[derive(Debug, Clone)]
476pub struct ResolvedServicePartition {
477    com: IFabricResolvedServicePartitionResult,
478    pub service_name: Uri,
479    pub service_partition_kind: ServicePartitionKind,
480    pub partition_key_type: PartitionKeyType,
481    pub endpoints: Vec<ResolvedServiceEndpoint>,
482}
483
484impl From<IFabricResolvedServicePartitionResult> for ResolvedServicePartition {
485    fn from(com: IFabricResolvedServicePartitionResult) -> Self {
486        let raw = unsafe { com.get_Partition().as_ref().unwrap() };
487        let service_name = Uri::from(raw.ServiceName);
488        let kind_raw = raw.Info.Kind;
489        let val = raw.Info.Value;
490        let service_partition_kind: ServicePartitionKind = kind_raw.into();
491        let partition_key_type =
492            unsafe { PartitionKeyType::from_raw_parts(service_partition_kind, val) };
493        let endpoints = crate::iter::vec_from_raw_com(raw.EndpointCount as usize, raw.Endpoints);
494        Self {
495            com,
496            service_name,
497            service_partition_kind,
498            partition_key_type,
499            endpoints,
500        }
501    }
502}
503
504impl ResolvedServicePartition {
505    // If compared with different partition error is returned.
506    // to enable the user to identify which RSP is more
507    // up-to-date. A returned value of 0 indicates that the two RSPs have the same version. 1 indicates that the other RSP has an older version.
508    // -1 indicates that the other RSP has a newer version.
509    pub fn compare_version(&self, other: &ResolvedServicePartition) -> crate::Result<i32> {
510        unsafe { self.com.CompareVersion(&other.com) }.map_err(crate::Error::from)
511    }
512}
513
514impl PartialEq for ResolvedServicePartition {
515    fn eq(&self, other: &Self) -> bool {
516        match self.compare_version(other) {
517            Ok(i) => i == 0,
518            Err(_) => false, // error comparing different services
519        }
520    }
521}
522
523impl PartialOrd for ResolvedServicePartition {
524    /// Compare the version of the resolved result.
525    /// a > b means partial_cmp(a,b) == Some(Greater) i.e. a.compare_version(b) > 0.
526    /// a is newer and up to date.
527    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
528        match self.compare_version(other) {
529            Ok(i) => Some(i.cmp(&0)),
530            // If you compare version of different service you get error
531            Err(_) => None,
532        }
533    }
534}
535
536#[derive(Debug, PartialEq, Eq, Clone, Copy)]
537pub enum ServiceEndpointRole {
538    Invalid,
539    StatefulPrimary,
540    StatefulPrimaryAuxiliary,
541    StatefulSecondary,
542    StatefulAuxiliary,
543    Stateless,
544}
545
546impl From<FABRIC_SERVICE_ENDPOINT_ROLE> for ServiceEndpointRole {
547    fn from(value: FABRIC_SERVICE_ENDPOINT_ROLE) -> Self {
548        match value {
549            FABRIC_SERVICE_ROLE_INVALID => ServiceEndpointRole::Invalid,
550            FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY => ServiceEndpointRole::StatefulPrimary,
551            FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY => {
552                ServiceEndpointRole::StatefulPrimaryAuxiliary
553            }
554            FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY => ServiceEndpointRole::StatefulSecondary,
555            FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY => ServiceEndpointRole::StatefulAuxiliary,
556            FABRIC_SERVICE_ROLE_STATELESS => ServiceEndpointRole::Stateless,
557            _ => {
558                if cfg!(debug_assertions) {
559                    panic!("unknown type: {value:?}");
560                } else {
561                    ServiceEndpointRole::Invalid
562                }
563            }
564        }
565    }
566}
567
568#[derive(Debug, Clone, PartialEq)]
569pub struct ResolvedServiceEndpoint {
570    pub address: WString,
571    pub role: ServiceEndpointRole,
572}
573
574impl From<&FABRIC_RESOLVED_SERVICE_ENDPOINT> for ResolvedServiceEndpoint {
575    fn from(value: &FABRIC_RESOLVED_SERVICE_ENDPOINT) -> Self {
576        let raw = value;
577        Self {
578            address: WString::from(raw.Address),
579            role: raw.Role.into(),
580        }
581    }
582}
583
584#[cfg(test)]
585mod tests {
586    use crate::{PCWSTR, WString};
587
588    use super::{PartitionKeyType, ServicePartitionKind};
589
590    #[test]
591    fn test_conversion_int() {
592        let k = PartitionKeyType::Int64(99);
593        // check the raw ptr is ok
594        let (key_type, raw) = k.as_raw_parts();
595        assert_eq!(
596            key_type,
597            mssf_com::FabricTypes::FABRIC_PARTITION_KEY_TYPE_INT64
598        );
599        let i = unsafe { (raw as *const i64).as_ref().unwrap() };
600        assert_eq!(*i, 99);
601
602        let service_type = ServicePartitionKind::Int64Range;
603        // restore the key
604        let k2 = unsafe { PartitionKeyType::from_raw_parts(service_type, raw) };
605        assert_eq!(k, k2);
606    }
607
608    #[test]
609    fn test_conversion_string() {
610        let src = WString::from("mystr");
611        let k = PartitionKeyType::String(src.clone());
612        // check the raw ptr is ok
613        let (key_type, raw) = k.as_raw_parts();
614        assert_eq!(
615            key_type,
616            mssf_com::FabricTypes::FABRIC_PARTITION_KEY_TYPE_STRING
617        );
618        let s = WString::from(PCWSTR::from_raw(raw as *const u16));
619        assert_eq!(s, src);
620
621        let service_type = ServicePartitionKind::Named;
622        // restore the key
623        let k2 = unsafe { PartitionKeyType::from_raw_parts(service_type, raw) };
624        assert_eq!(k, k2);
625    }
626}