Skip to main content

mssf_core/client/
svc_mgmt_client.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{ffi::c_void, time::Duration};
7
8use crate::{
9    PCWSTR, WString,
10    mem::{BoxPool, GetRawWithBoxPool},
11    runtime::executor::BoxedCancelToken,
12    types::Uri,
13};
14use mssf_com::{
15    FabricClient::{IFabricResolvedServicePartitionResult, IFabricServiceManagementClient8},
16    FabricTypes::{
17        FABRIC_PARTITION_KEY_TYPE, FABRIC_PARTITION_KEY_TYPE_INT64,
18        FABRIC_PARTITION_KEY_TYPE_INVALID, FABRIC_PARTITION_KEY_TYPE_NONE,
19        FABRIC_PARTITION_KEY_TYPE_STRING, FABRIC_REMOVE_REPLICA_DESCRIPTION,
20        FABRIC_RESOLVED_SERVICE_ENDPOINT, FABRIC_RESTART_REPLICA_DESCRIPTION,
21        FABRIC_SERVICE_DESCRIPTION, FABRIC_SERVICE_ENDPOINT_ROLE,
22        FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION, FABRIC_SERVICE_PARTITION_KIND,
23        FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE, FABRIC_SERVICE_PARTITION_KIND_INVALID,
24        FABRIC_SERVICE_PARTITION_KIND_NAMED, FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
25        FABRIC_SERVICE_ROLE_INVALID, FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY,
26        FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY, FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY,
27        FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY, FABRIC_SERVICE_ROLE_STATELESS,
28        FABRIC_SERVICE_UPDATE_DESCRIPTION, FABRIC_URI,
29    },
30};
31
32use crate::sync::{FabricReceiver, fabric_begin_end_proxy};
33
34use crate::types::{
35    RemoveReplicaDescription, RestartReplicaDescription, ServiceNotificationFilterDescription,
36};
37
38// Service Management Client
39#[derive(Debug, Clone)]
40pub struct ServiceManagementClient {
41    com: IFabricServiceManagementClient8,
42}
43
44impl ServiceManagementClient {
45    pub fn get_com(&self) -> IFabricServiceManagementClient8 {
46        self.com.clone()
47    }
48}
49// internal implementation block
50
51impl ServiceManagementClient {
52    fn resolve_service_partition_internal(
53        &self,
54        name: FABRIC_URI,
55        partition_key_type: FABRIC_PARTITION_KEY_TYPE,
56        partition_key: *const ::core::ffi::c_void,
57        previous_result: Option<&IFabricResolvedServicePartitionResult>, // This is different from generated code
58        timeout_milliseconds: u32,
59        cancellation_token: Option<BoxedCancelToken>,
60    ) -> FabricReceiver<crate::WinResult<IFabricResolvedServicePartitionResult>> {
61        let com1 = &self.com;
62        let com2 = self.com.clone();
63        fabric_begin_end_proxy(
64            move |callback| unsafe {
65                com1.BeginResolveServicePartition(
66                    name,
67                    partition_key_type,
68                    partition_key,
69                    previous_result,
70                    timeout_milliseconds,
71                    callback,
72                )
73            },
74            move |ctx| unsafe { com2.EndResolveServicePartition(ctx) },
75            cancellation_token,
76        )
77    }
78
79    fn restart_replica_internal(
80        &self,
81        desc: &FABRIC_RESTART_REPLICA_DESCRIPTION,
82        timeout_milliseconds: u32,
83        cancellation_token: Option<BoxedCancelToken>,
84    ) -> FabricReceiver<crate::WinResult<()>> {
85        let com1 = &self.com;
86        let com2 = self.com.clone();
87        fabric_begin_end_proxy(
88            move |callback| unsafe {
89                com1.BeginRestartReplica(desc, timeout_milliseconds, callback)
90            },
91            move |ctx| unsafe { com2.EndRestartReplica(ctx) },
92            cancellation_token,
93        )
94    }
95
96    fn remove_replica_internal(
97        &self,
98        desc: &FABRIC_REMOVE_REPLICA_DESCRIPTION,
99        timeout_milliseconds: u32,
100        cancellation_token: Option<BoxedCancelToken>,
101    ) -> FabricReceiver<crate::WinResult<()>> {
102        let com1 = &self.com;
103        let com2 = self.com.clone();
104        fabric_begin_end_proxy(
105            move |callback| unsafe {
106                com1.BeginRemoveReplica(desc, timeout_milliseconds, callback)
107            },
108            move |ctx| unsafe { com2.EndRemoveReplica(ctx) },
109            cancellation_token,
110        )
111    }
112
113    fn register_service_notification_filter_internal(
114        &self,
115        desc: &FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION,
116        timeout_milliseconds: u32,
117        cancellation_token: Option<BoxedCancelToken>,
118    ) -> FabricReceiver<crate::WinResult<i64>> {
119        let com1 = &self.com;
120        let com2 = self.com.clone();
121        fabric_begin_end_proxy(
122            move |callback| unsafe {
123                com1.BeginRegisterServiceNotificationFilter(desc, timeout_milliseconds, callback)
124            },
125            move |ctx| unsafe { com2.EndRegisterServiceNotificationFilter(ctx) },
126            cancellation_token,
127        )
128    }
129
130    fn unregister_service_notification_filter_internal(
131        &self,
132        filterid: i64,
133        timeout_milliseconds: u32,
134        cancellation_token: Option<BoxedCancelToken>,
135    ) -> FabricReceiver<crate::WinResult<()>> {
136        let com1 = &self.com;
137        let com2 = self.com.clone();
138        fabric_begin_end_proxy(
139            move |callback| unsafe {
140                com1.BeginUnregisterServiceNotificationFilter(
141                    filterid,
142                    timeout_milliseconds,
143                    callback,
144                )
145            },
146            move |ctx| unsafe { com2.EndUnregisterServiceNotificationFilter(ctx) },
147            cancellation_token,
148        )
149    }
150
151    fn create_service_internal(
152        &self,
153        desc: &FABRIC_SERVICE_DESCRIPTION,
154        timeout_milliseconds: u32,
155        cancellation_token: Option<BoxedCancelToken>,
156    ) -> FabricReceiver<crate::WinResult<()>> {
157        let com1 = &self.com;
158        let com2 = self.com.clone();
159        fabric_begin_end_proxy(
160            move |callback| unsafe {
161                com1.BeginCreateService(desc, timeout_milliseconds, callback)
162            },
163            move |ctx| unsafe { com2.EndCreateService(ctx) },
164            cancellation_token,
165        )
166    }
167
168    fn update_service_internal(
169        &self,
170        name: FABRIC_URI,
171        desc: &FABRIC_SERVICE_UPDATE_DESCRIPTION,
172        timeout_milliseconds: u32,
173        cancellation_token: Option<BoxedCancelToken>,
174    ) -> FabricReceiver<crate::WinResult<()>> {
175        let com1 = &self.com;
176        let com2 = self.com.clone();
177        fabric_begin_end_proxy(
178            move |callback| unsafe {
179                com1.BeginUpdateService(name, desc, timeout_milliseconds, callback)
180            },
181            move |ctx| unsafe { com2.EndUpdateService(ctx) },
182            cancellation_token,
183        )
184    }
185
186    fn delete_service_internal(
187        &self,
188        name: FABRIC_URI,
189        timeout_milliseconds: u32,
190        cancellation_token: Option<BoxedCancelToken>,
191    ) -> FabricReceiver<crate::WinResult<()>> {
192        let com1 = &self.com;
193        let com2 = self.com.clone();
194        fabric_begin_end_proxy(
195            move |callback| unsafe {
196                com1.BeginDeleteService(name, timeout_milliseconds, callback)
197            },
198            move |ctx| unsafe { com2.EndDeleteService(ctx) },
199            cancellation_token,
200        )
201    }
202}
203
204impl From<IFabricServiceManagementClient8> for ServiceManagementClient {
205    fn from(com: IFabricServiceManagementClient8) -> Self {
206        Self { com }
207    }
208}
209
210impl From<ServiceManagementClient> for IFabricServiceManagementClient8 {
211    fn from(value: ServiceManagementClient) -> Self {
212        value.com
213    }
214}
215
216// public implementation block - tokio required
217
218impl ServiceManagementClient {
219    // Resolve service partition
220    pub async fn resolve_service_partition(
221        &self,
222        name: &Uri,
223        key_type: &PartitionKeyType,
224        prev: Option<&ResolvedServicePartition>,
225        timeout: Duration,
226        cancellation_token: Option<BoxedCancelToken>,
227    ) -> crate::Result<ResolvedServicePartition> {
228        let com = {
229            let uri = name.as_raw();
230            // supply prev as null if not present
231            let prev_opt = prev.map(|x| &x.com);
232
233            let (key_type, key) = key_type.as_raw_parts();
234
235            self.resolve_service_partition_internal(
236                uri,
237                key_type,
238                key,
239                prev_opt,
240                timeout.as_millis().try_into().unwrap(),
241                cancellation_token,
242            )
243        }
244        .await??;
245        let res = ResolvedServicePartition::from(com);
246        Ok(res)
247    }
248
249    /// Simulates a service replica failure by restarting a persisted service replica,
250    /// closing the replica, and then reopening it. Use this to test your service for problems
251    /// along the replica reopen path. This helps simulate the report fault temporary path through client APIs.
252    /// This is only valid for replicas that belong to stateful persisted services.
253    pub async fn restart_replica(
254        &self,
255        desc: &RestartReplicaDescription,
256        timeout: Duration,
257        cancellation_token: Option<BoxedCancelToken>,
258    ) -> crate::Result<()> {
259        {
260            let raw: FABRIC_RESTART_REPLICA_DESCRIPTION = desc.into();
261            self.restart_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
262        }
263        .await?
264        .map_err(crate::Error::from)
265    }
266
267    /// This API gives a running replica the chance to cleanup its state and be gracefully shutdown.
268    /// WARNING: There are no safety checks performed when this API is used.
269    /// Incorrect use of this API can lead to data loss for stateful services.
270    /// Remarks:
271    /// For stateless services, Instance Abort is called.
272    pub async fn remove_replica(
273        &self,
274        desc: &RemoveReplicaDescription,
275        timeout: Duration,
276        cancellation_token: Option<BoxedCancelToken>,
277    ) -> crate::Result<()> {
278        {
279            let raw: FABRIC_REMOVE_REPLICA_DESCRIPTION = desc.into();
280            self.remove_replica_internal(&raw, timeout.as_millis() as u32, cancellation_token)
281        }
282        .await?
283        .map_err(crate::Error::from)
284    }
285
286    /// Remarks:
287    /// There is a cache of service endpoints in the client that gets updated by notifications
288    /// and this same cache is used to satisfy complaint based resolution requests
289    /// (see resolve_service_partition())). Applications that both register for notifications
290    /// and use complaint based resolution on the same client instance typically only need to
291    /// pass null for the ResolvedServicePartition argument during resolution.
292    /// This will always return the endpoints in the client cache updated by the latest notification.
293    /// The notification mechanism itself will keep the client cache updated when service endpoints change.
294    ///
295    /// Notification callback is delivered on `FabricClientBuilder::with_on_service_notification` as well.
296    /// The callback contains minimum info only as a signal, user can call resolve_service_partition()
297    /// again to retrieve full info from the cache.
298    ///
299    /// This is observed to have 1~4 secs delay compared with brute force complaint based resolve.
300    pub async fn register_service_notification_filter(
301        &self,
302        desc: &ServiceNotificationFilterDescription,
303        timeout: Duration,
304        cancellation_token: Option<BoxedCancelToken>,
305    ) -> crate::Result<FilterIdHandle> {
306        let id = {
307            let raw: FABRIC_SERVICE_NOTIFICATION_FILTER_DESCRIPTION = desc.into();
308            self.register_service_notification_filter_internal(
309                &raw,
310                timeout.as_millis() as u32,
311                cancellation_token,
312            )
313        }
314        .await??;
315        Ok(FilterIdHandle { id })
316    }
317
318    /// It's not necessary to unregister individual filters if the client itself
319    /// will no longer be used since all ServiceNotificationFilterDescription
320    /// objects registered by the FabricClient will be automatically unregistered when client is disposed.
321    pub async fn unregister_service_notification_filter(
322        &self,
323        filter_id_handle: FilterIdHandle,
324        timeout: Duration,
325        cancellation_token: Option<BoxedCancelToken>,
326    ) -> crate::Result<()> {
327        self.unregister_service_notification_filter_internal(
328            filter_id_handle.id,
329            timeout.as_millis() as u32,
330            cancellation_token,
331        )
332        .await?
333        .map_err(crate::Error::from)
334    }
335
336    pub async fn create_service(
337        &self,
338        desc: &crate::types::ServiceDescription,
339        timeout: Duration,
340        cancellation_token: Option<BoxedCancelToken>,
341    ) -> crate::Result<()> {
342        {
343            let mut pool = BoxPool::new();
344            let ffi_raw = desc.get_raw_with_pool(&mut pool);
345            self.create_service_internal(&ffi_raw, timeout.as_millis() as u32, cancellation_token)
346        }
347        .await?
348        .map_err(crate::Error::from)
349    }
350
351    pub async fn update_service(
352        &self,
353        name: &Uri,
354        desc: &crate::types::ServiceUpdateDescription,
355        timeout: Duration,
356        cancellation_token: Option<BoxedCancelToken>,
357    ) -> crate::Result<()> {
358        {
359            let mut pool = BoxPool::new();
360            let ffi_raw = desc.get_raw_with_pool(&mut pool);
361            self.update_service_internal(
362                name.as_raw(),
363                &ffi_raw,
364                timeout.as_millis() as u32,
365                cancellation_token,
366            )
367        }
368        .await?
369        .map_err(crate::Error::from)
370    }
371
372    pub async fn delete_service(
373        &self,
374        name: &Uri,
375        timeout: Duration,
376        cancellation_token: Option<BoxedCancelToken>,
377    ) -> crate::Result<()> {
378        self.delete_service_internal(
379            name.as_raw(),
380            timeout.as_millis() as u32,
381            cancellation_token,
382        )
383        .await?
384        .map_err(crate::Error::from)
385    }
386}
387
388// Handle to the registered service notification filter
389
390#[derive(Debug, PartialEq)]
391pub struct FilterIdHandle {
392    pub(crate) id: i64,
393}
394
395// see ComFabricClient.cpp for conversion details in cpp
396#[derive(Debug, PartialEq, Clone)]
397pub enum PartitionKeyType {
398    Int64(i64),
399    Invalid,
400    None,
401    String(WString),
402}
403
404impl PartitionKeyType {
405    /// # Safety
406    /// Data must be valid for the given partition kind.
407    unsafe fn from_raw_parts(svc: ServicePartitionKind, data: *const c_void) -> PartitionKeyType {
408        match svc {
409            ServicePartitionKind::Int64Range => {
410                let x = unsafe { (data as *mut i64).as_ref().unwrap() };
411                PartitionKeyType::Int64(*x)
412            }
413            ServicePartitionKind::Invalid => PartitionKeyType::Invalid,
414            ServicePartitionKind::Singleton => PartitionKeyType::None,
415            ServicePartitionKind::Named => {
416                let x = data as *mut u16;
417                assert!(!x.is_null());
418                let s = WString::from(PCWSTR::from_raw(x));
419                PartitionKeyType::String(s)
420            }
421        }
422    }
423}
424
425impl PartitionKeyType {
426    // Get raw parts to pass in com api
427    fn as_raw_parts(&self) -> (FABRIC_PARTITION_KEY_TYPE, *const c_void) {
428        match self {
429            // Not sure if this is ok for i64
430            PartitionKeyType::Int64(x) => (
431                FABRIC_PARTITION_KEY_TYPE_INT64,
432                x as *const i64 as *const c_void,
433            ),
434            PartitionKeyType::Invalid => (FABRIC_PARTITION_KEY_TYPE_INVALID, std::ptr::null()),
435            PartitionKeyType::None => (FABRIC_PARTITION_KEY_TYPE_NONE, std::ptr::null()),
436            PartitionKeyType::String(x) => (
437                FABRIC_PARTITION_KEY_TYPE_STRING,
438                x.as_pcwstr().as_ptr() as *const c_void,
439            ),
440        }
441    }
442}
443
444#[derive(Clone, Copy, Debug, PartialEq)]
445pub enum ServicePartitionKind {
446    Int64Range,
447    Invalid,
448    Named,
449    Singleton,
450}
451
452impl From<&ServicePartitionKind> for FABRIC_SERVICE_PARTITION_KIND {
453    fn from(value: &ServicePartitionKind) -> Self {
454        match value {
455            ServicePartitionKind::Int64Range => FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE,
456            ServicePartitionKind::Invalid => FABRIC_SERVICE_PARTITION_KIND_INVALID,
457            ServicePartitionKind::Named => FABRIC_SERVICE_PARTITION_KIND_NAMED,
458            ServicePartitionKind::Singleton => FABRIC_SERVICE_PARTITION_KIND_SINGLETON,
459        }
460    }
461}
462
463impl From<FABRIC_SERVICE_PARTITION_KIND> for ServicePartitionKind {
464    fn from(value: FABRIC_SERVICE_PARTITION_KIND) -> Self {
465        match value {
466            FABRIC_SERVICE_PARTITION_KIND_INT64_RANGE => ServicePartitionKind::Int64Range,
467            FABRIC_SERVICE_PARTITION_KIND_INVALID => ServicePartitionKind::Invalid,
468            FABRIC_SERVICE_PARTITION_KIND_NAMED => ServicePartitionKind::Named,
469            FABRIC_SERVICE_PARTITION_KIND_SINGLETON => ServicePartitionKind::Singleton,
470            _ => {
471                if cfg!(debug_assertions) {
472                    panic!("unknown type: {value:?}");
473                } else {
474                    ServicePartitionKind::Invalid
475                }
476            }
477        }
478    }
479}
480
481#[derive(Debug, Clone)]
482pub struct ResolvedServicePartition {
483    com: IFabricResolvedServicePartitionResult,
484    pub service_name: Uri,
485    pub service_partition_kind: ServicePartitionKind,
486    pub partition_key_type: PartitionKeyType,
487    pub endpoints: Vec<ResolvedServiceEndpoint>,
488}
489
490impl From<IFabricResolvedServicePartitionResult> for ResolvedServicePartition {
491    fn from(com: IFabricResolvedServicePartitionResult) -> Self {
492        let raw = unsafe { com.get_Partition().as_ref().unwrap() };
493        let service_name = Uri::from(raw.ServiceName);
494        let kind_raw = raw.Info.Kind;
495        let val = raw.Info.Value;
496        let service_partition_kind: ServicePartitionKind = kind_raw.into();
497        let partition_key_type =
498            unsafe { PartitionKeyType::from_raw_parts(service_partition_kind, val) };
499        let endpoints = crate::iter::vec_from_raw_com(raw.EndpointCount as usize, raw.Endpoints);
500        Self {
501            com,
502            service_name,
503            service_partition_kind,
504            partition_key_type,
505            endpoints,
506        }
507    }
508}
509
510impl ResolvedServicePartition {
511    // If compared with different partition error is returned.
512    // to enable the user to identify which RSP is more
513    // up-to-date. A returned value of 0 indicates that the two RSPs have the same version. 1 indicates that the other RSP has an older version.
514    // -1 indicates that the other RSP has a newer version.
515    pub fn compare_version(&self, other: &ResolvedServicePartition) -> crate::Result<i32> {
516        unsafe { self.com.CompareVersion(&other.com) }.map_err(crate::Error::from)
517    }
518}
519
520impl PartialEq for ResolvedServicePartition {
521    fn eq(&self, other: &Self) -> bool {
522        match self.compare_version(other) {
523            Ok(i) => i == 0,
524            Err(_) => false, // error comparing different services
525        }
526    }
527}
528
529impl PartialOrd for ResolvedServicePartition {
530    /// Compare the version of the resolved result.
531    /// a > b means partial_cmp(a,b) == Some(Greater) i.e. a.compare_version(b) > 0.
532    /// a is newer and up to date.
533    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
534        match self.compare_version(other) {
535            Ok(i) => Some(i.cmp(&0)),
536            // If you compare version of different service you get error
537            Err(_) => None,
538        }
539    }
540}
541
542#[derive(Debug, PartialEq, Eq, Clone, Copy)]
543pub enum ServiceEndpointRole {
544    Invalid,
545    StatefulPrimary,
546    StatefulPrimaryAuxiliary,
547    StatefulSecondary,
548    StatefulAuxiliary,
549    Stateless,
550}
551
552impl From<FABRIC_SERVICE_ENDPOINT_ROLE> for ServiceEndpointRole {
553    fn from(value: FABRIC_SERVICE_ENDPOINT_ROLE) -> Self {
554        match value {
555            FABRIC_SERVICE_ROLE_INVALID => ServiceEndpointRole::Invalid,
556            FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY => ServiceEndpointRole::StatefulPrimary,
557            FABRIC_SERVICE_ROLE_STATEFUL_PRIMARY_AUXILIARY => {
558                ServiceEndpointRole::StatefulPrimaryAuxiliary
559            }
560            FABRIC_SERVICE_ROLE_STATEFUL_SECONDARY => ServiceEndpointRole::StatefulSecondary,
561            FABRIC_SERVICE_ROLE_STATEFUL_AUXILIARY => ServiceEndpointRole::StatefulAuxiliary,
562            FABRIC_SERVICE_ROLE_STATELESS => ServiceEndpointRole::Stateless,
563            _ => {
564                if cfg!(debug_assertions) {
565                    panic!("unknown type: {value:?}");
566                } else {
567                    ServiceEndpointRole::Invalid
568                }
569            }
570        }
571    }
572}
573
574#[derive(Debug, Clone, PartialEq)]
575pub struct ResolvedServiceEndpoint {
576    pub address: WString,
577    pub role: ServiceEndpointRole,
578}
579
580impl From<&FABRIC_RESOLVED_SERVICE_ENDPOINT> for ResolvedServiceEndpoint {
581    fn from(value: &FABRIC_RESOLVED_SERVICE_ENDPOINT) -> Self {
582        let raw = value;
583        Self {
584            address: WString::from(raw.Address),
585            role: raw.Role.into(),
586        }
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use crate::{PCWSTR, WString};
593
594    use super::{PartitionKeyType, ServicePartitionKind};
595
596    #[test]
597    fn test_conversion_int() {
598        let k = PartitionKeyType::Int64(99);
599        // check the raw ptr is ok
600        let (key_type, raw) = k.as_raw_parts();
601        assert_eq!(
602            key_type,
603            mssf_com::FabricTypes::FABRIC_PARTITION_KEY_TYPE_INT64
604        );
605        let i = unsafe { (raw as *const i64).as_ref().unwrap() };
606        assert_eq!(*i, 99);
607
608        let service_type = ServicePartitionKind::Int64Range;
609        // restore the key
610        let k2 = unsafe { PartitionKeyType::from_raw_parts(service_type, raw) };
611        assert_eq!(k, k2);
612    }
613
614    #[test]
615    fn test_conversion_string() {
616        let src = WString::from("mystr");
617        let k = PartitionKeyType::String(src.clone());
618        // check the raw ptr is ok
619        let (key_type, raw) = k.as_raw_parts();
620        assert_eq!(
621            key_type,
622            mssf_com::FabricTypes::FABRIC_PARTITION_KEY_TYPE_STRING
623        );
624        let s = WString::from(PCWSTR::from_raw(raw as *const u16));
625        assert_eq!(s, src);
626
627        let service_type = ServicePartitionKind::Named;
628        // restore the key
629        let k2 = unsafe { PartitionKeyType::from_raw_parts(service_type, raw) };
630        assert_eq!(k, k2);
631    }
632}