mssf_core/runtime/
stateful_proxy.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6// stateful_proxy is a wrapper layer around com api,
7// making manipulating com simple.
8
9use std::ffi::c_void;
10
11use crate::{Interface, WString, runtime::executor::BoxedCancelToken};
12use mssf_com::FabricRuntime::{
13    IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum,
14    IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
15};
16
17use crate::{
18    error::ErrorCode,
19    strings::WStringWrap,
20    sync::fabric_begin_end_proxy,
21    types::{
22        FaultType, HealthInformation, LoadMetric, LoadMetricListRef, MoveCost, ReplicaRole,
23        ServicePartitionAccessStatus, ServicePartitionInformation,
24    },
25};
26
27use super::stateful::{PrimaryReplicator, Replicator, StatefulServiceReplica};
28use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuorumMode};
29
30pub struct StatefulServiceReplicaProxy {
31    com_impl: IFabricStatefulServiceReplica,
32}
33
34impl StatefulServiceReplicaProxy {
35    pub fn new(com_impl: IFabricStatefulServiceReplica) -> StatefulServiceReplicaProxy {
36        StatefulServiceReplicaProxy { com_impl }
37    }
38}
39
40impl StatefulServiceReplica for StatefulServiceReplicaProxy {
41    #[cfg_attr(
42        feature = "tracing",
43        tracing::instrument(skip_all, level = "debug", err) // TODO: trace ret
44    )]
45    async fn open(
46        &self,
47        openmode: OpenMode,
48        partition: &StatefulServicePartition,
49        cancellation_token: BoxedCancelToken,
50    ) -> crate::Result<impl PrimaryReplicator> {
51        let com1 = &self.com_impl;
52        let com2 = self.com_impl.clone();
53        let rx = fabric_begin_end_proxy(
54            move |callback| unsafe {
55                com1.BeginOpen(openmode.into(), partition.get_com(), callback)
56            },
57            move |ctx| unsafe { com2.EndOpen(ctx) },
58            Some(cancellation_token),
59        );
60        let rplctr = rx.await??;
61
62        // Check COM interface is implemented.
63        let catchup_specific_quorum = rplctr
64            .cast::<IFabricReplicatorCatchupSpecificQuorum>()
65            .is_ok();
66        assert!(
67            catchup_specific_quorum,
68            "mssf does not support replicator without catchup_specific_quorum interface"
69        );
70
71        // TODO: cast without clone will cause access violation on AddRef in SF runtime.
72        let p_rplctr: IFabricPrimaryReplicator = rplctr.clone().cast().unwrap(); // must work
73        // Replicator must impl primary replicator as well.
74
75        let res = PrimaryReplicatorProxy::new(p_rplctr);
76        Ok(res)
77    }
78
79    #[cfg_attr(
80        feature = "tracing",
81        tracing::instrument(skip_all, level = "debug", ret, err)
82    )]
83    async fn change_role(
84        &self,
85        newrole: ReplicaRole,
86        cancellation_token: BoxedCancelToken,
87    ) -> crate::Result<WString> {
88        // replica address
89        let com1 = &self.com_impl;
90        let com2 = self.com_impl.clone();
91        let rx = fabric_begin_end_proxy(
92            move |callback| unsafe { com1.BeginChangeRole((&newrole).into(), callback) },
93            move |ctx| unsafe { com2.EndChangeRole(ctx) },
94            Some(cancellation_token),
95        );
96        let addr = rx.await??;
97        Ok(WStringWrap::from(&addr).into())
98    }
99
100    #[cfg_attr(
101        feature = "tracing",
102        tracing::instrument(skip_all, level = "debug", ret, err)
103    )]
104    async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
105        let com1 = &self.com_impl;
106        let com2 = self.com_impl.clone();
107        let rx = fabric_begin_end_proxy(
108            move |callback| unsafe { com1.BeginClose(callback) },
109            move |ctx| unsafe { com2.EndClose(ctx) },
110            Some(cancellation_token),
111        );
112        rx.await?.map_err(crate::Error::from)
113    }
114    #[cfg_attr(
115        feature = "tracing",
116        tracing::instrument(skip_all, level = "debug", ret)
117    )]
118    fn abort(&self) {
119        unsafe { self.com_impl.Abort() }
120    }
121}
122
123pub struct ReplicatorProxy {
124    com_impl: IFabricReplicator,
125}
126
127impl ReplicatorProxy {
128    fn new(com_impl: IFabricReplicator) -> ReplicatorProxy {
129        ReplicatorProxy { com_impl }
130    }
131}
132
133impl Replicator for ReplicatorProxy {
134    #[cfg_attr(
135        feature = "tracing",
136        tracing::instrument(skip_all, level = "debug", ret, err)
137    )]
138    async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
139        // replicator address
140        let com1 = &self.com_impl;
141        let com2 = self.com_impl.clone();
142        let rx = fabric_begin_end_proxy(
143            move |callback| unsafe { com1.BeginOpen(callback) },
144            move |ctx| unsafe { com2.EndOpen(ctx) },
145            Some(cancellation_token),
146        );
147        let addr = rx.await??;
148        Ok(WStringWrap::from(&addr).into())
149    }
150    #[cfg_attr(
151        feature = "tracing",
152        tracing::instrument(skip_all, level = "debug", ret, err)
153    )]
154    async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
155        let com1 = &self.com_impl;
156        let com2 = self.com_impl.clone();
157        let rx = fabric_begin_end_proxy(
158            move |callback| unsafe { com1.BeginClose(callback) },
159            move |ctx| unsafe { com2.EndClose(ctx) },
160            Some(cancellation_token),
161        );
162        rx.await?.map_err(crate::Error::from)
163    }
164    #[cfg_attr(
165        feature = "tracing",
166        tracing::instrument(skip_all, level = "debug", ret, err)
167    )]
168    async fn change_role(
169        &self,
170        epoch: &Epoch,
171        role: &ReplicaRole,
172        cancellation_token: BoxedCancelToken,
173    ) -> crate::Result<()> {
174        let com1 = &self.com_impl;
175        let com2 = self.com_impl.clone();
176        let rx = fabric_begin_end_proxy(
177            move |callback| unsafe { com1.BeginChangeRole(&epoch.into(), role.into(), callback) },
178            move |ctx| unsafe { com2.EndChangeRole(ctx) },
179            Some(cancellation_token),
180        );
181        rx.await?.map_err(crate::Error::from)
182    }
183    #[cfg_attr(
184        feature = "tracing",
185        tracing::instrument(skip_all, level = "debug", ret, err)
186    )]
187    async fn update_epoch(
188        &self,
189        epoch: &Epoch,
190        cancellation_token: BoxedCancelToken,
191    ) -> crate::Result<()> {
192        let com1 = &self.com_impl;
193        let com2 = self.com_impl.clone();
194        let rx = fabric_begin_end_proxy(
195            move |callback| unsafe { com1.BeginUpdateEpoch(&epoch.into(), callback) },
196            move |ctx| unsafe { com2.EndUpdateEpoch(ctx) },
197            Some(cancellation_token),
198        );
199        rx.await?.map_err(crate::Error::from)
200    }
201    #[cfg_attr(
202        feature = "tracing",
203        tracing::instrument(skip_all, level = "debug", ret, err)
204    )]
205    #[cfg_attr(
206        feature = "tracing",
207        tracing::instrument(skip_all, level = "debug", ret, err)
208    )]
209    fn get_current_progress(&self) -> crate::Result<i64> {
210        unsafe { self.com_impl.GetCurrentProgress() }.map_err(crate::Error::from)
211    }
212    #[cfg_attr(
213        feature = "tracing",
214        tracing::instrument(skip_all, level = "debug", ret, err)
215    )]
216    fn get_catch_up_capability(&self) -> crate::Result<i64> {
217        unsafe { self.com_impl.GetCatchUpCapability() }.map_err(crate::Error::from)
218    }
219    #[cfg_attr(
220        feature = "tracing",
221        tracing::instrument(skip_all, level = "debug", ret)
222    )]
223    fn abort(&self) {
224        unsafe { self.com_impl.Abort() }
225    }
226}
227
228pub struct PrimaryReplicatorProxy {
229    com_impl: IFabricPrimaryReplicator,
230    parent: ReplicatorProxy,
231}
232
233impl PrimaryReplicatorProxy {
234    pub fn new(com_impl: IFabricPrimaryReplicator) -> PrimaryReplicatorProxy {
235        let parent = ReplicatorProxy::new(com_impl.clone().cast().unwrap());
236        PrimaryReplicatorProxy { com_impl, parent }
237    }
238}
239
240impl Replicator for PrimaryReplicatorProxy {
241    async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
242        self.parent.open(cancellation_token).await
243    }
244    async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
245        self.parent.close(cancellation_token).await
246    }
247    async fn change_role(
248        &self,
249        epoch: &Epoch,
250        role: &ReplicaRole,
251        cancellation_token: BoxedCancelToken,
252    ) -> crate::Result<()> {
253        self.parent
254            .change_role(epoch, role, cancellation_token)
255            .await
256    }
257    async fn update_epoch(
258        &self,
259        epoch: &Epoch,
260        cancellation_token: BoxedCancelToken,
261    ) -> crate::Result<()> {
262        self.parent.update_epoch(epoch, cancellation_token).await
263    }
264    fn get_current_progress(&self) -> crate::Result<i64> {
265        self.parent.get_current_progress()
266    }
267    fn get_catch_up_capability(&self) -> crate::Result<i64> {
268        self.parent.get_catch_up_capability()
269    }
270    fn abort(&self) {
271        self.parent.abort()
272    }
273}
274
275impl PrimaryReplicator for PrimaryReplicatorProxy {
276    #[cfg_attr(
277        feature = "tracing",
278        tracing::instrument(skip_all, level = "debug", ret, err)
279    )]
280    async fn on_data_loss(&self, cancellation_token: BoxedCancelToken) -> crate::Result<u8> {
281        let com1 = &self.com_impl;
282        let com2 = self.com_impl.clone();
283        let rx = fabric_begin_end_proxy(
284            move |callback| unsafe { com1.BeginOnDataLoss(callback) },
285            move |ctx| unsafe { com2.EndOnDataLoss(ctx) },
286            Some(cancellation_token),
287        );
288        rx.await?.map_err(crate::Error::from)
289    }
290    #[cfg_attr(
291        feature = "tracing",
292        tracing::instrument(skip_all, level = "debug", ret, err)
293    )]
294    fn update_catch_up_replica_set_configuration(
295        &self,
296        currentconfiguration: &ReplicaSetConfig,
297        previousconfiguration: &ReplicaSetConfig,
298    ) -> crate::Result<()> {
299        let cc_view = currentconfiguration.get_view();
300        let pc_view = previousconfiguration.get_view();
301        unsafe {
302            self.com_impl
303                .UpdateCatchUpReplicaSetConfiguration(cc_view.get_raw(), pc_view.get_raw())
304        }
305        .map_err(crate::Error::from)
306    }
307    #[cfg_attr(
308        feature = "tracing",
309        tracing::instrument(skip_all, level = "debug", ret, err)
310    )]
311    async fn wait_for_catch_up_quorum(
312        &self,
313        catchupmode: ReplicaSetQuorumMode,
314        cancellation_token: BoxedCancelToken,
315    ) -> crate::Result<()> {
316        let com1 = &self.com_impl;
317        let com2 = self.com_impl.clone();
318        let rx = fabric_begin_end_proxy(
319            move |callback| unsafe { com1.BeginWaitForCatchUpQuorum(catchupmode.into(), callback) },
320            move |ctx| unsafe { com2.EndWaitForCatchUpQuorum(ctx) },
321            Some(cancellation_token),
322        );
323        rx.await?.map_err(crate::Error::from)
324    }
325    #[cfg_attr(
326        feature = "tracing",
327        tracing::instrument(skip_all, level = "debug", ret, err)
328    )]
329    fn update_current_replica_set_configuration(
330        &self,
331        currentconfiguration: &ReplicaSetConfig,
332    ) -> crate::Result<()> {
333        unsafe {
334            self.com_impl
335                .UpdateCurrentReplicaSetConfiguration(currentconfiguration.get_view().get_raw())
336        }
337        .map_err(crate::Error::from)
338    }
339    #[cfg_attr(
340        feature = "tracing",
341        tracing::instrument(skip_all, level = "debug", ret, err)
342    )]
343    async fn build_replica(
344        &self,
345        replica: &ReplicaInformation,
346        cancellation_token: BoxedCancelToken,
347    ) -> crate::Result<()> {
348        let com1 = &self.com_impl;
349        let com2 = self.com_impl.clone();
350        let rx = fabric_begin_end_proxy(
351            move |callback| {
352                let (mut info, ex1) = replica.get_raw_parts();
353                info.Reserved = std::ptr::addr_of!(ex1) as *mut c_void;
354                unsafe { com1.BeginBuildReplica(&info, callback) }
355            },
356            move |ctx| unsafe { com2.EndBuildReplica(ctx) },
357            Some(cancellation_token),
358        );
359        rx.await?.map_err(crate::Error::from)
360    }
361    #[cfg_attr(
362        feature = "tracing",
363        tracing::instrument(skip_all, level = "debug", ret, err)
364    )]
365    fn remove_replica(&self, replicaid: i64) -> crate::Result<()> {
366        unsafe { self.com_impl.RemoveReplica(replicaid) }.map_err(crate::Error::from)
367    }
368}
369
370/// Proxy COM object IFabricStatefulServicePartition3
371#[derive(Debug, Clone)]
372pub struct StatefulServicePartition {
373    com_impl: IFabricStatefulServicePartition3,
374}
375
376impl StatefulServicePartition {
377    pub fn get_com(&self) -> &IFabricStatefulServicePartition3 {
378        &self.com_impl
379    }
380
381    /// Provides access to the ServicePartitionInformation of the service, which contains the partition type and ID.
382    pub fn get_partition_information(&self) -> crate::Result<ServicePartitionInformation> {
383        unsafe { self.com_impl.GetPartitionInfo()?.as_ref() }
384            .ok_or(ErrorCode::E_POINTER.into())
385            .map(ServicePartitionInformation::from)
386    }
387
388    /// Used to check the readiness of the replica in regard to read operations.
389    /// The ReadStatus should be checked before the replica is servicing a customer request that is a read operation.
390    pub fn get_read_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
391        unsafe { self.com_impl.GetReadStatus() }
392            .map(ServicePartitionAccessStatus::from)
393            .map_err(crate::Error::from)
394    }
395
396    /// Used to check the readiness of the partition in regard to write operations.
397    /// The WriteStatus should be checked before the replica services a customer request that is a write operation.
398    pub fn get_write_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
399        unsafe { self.com_impl.GetWriteStatus() }
400            .map(ServicePartitionAccessStatus::from)
401            .map_err(crate::Error::from)
402    }
403
404    /// TODO: not implemented
405    /// Creates a FabricReplicator with the specified settings and returns it to the replica.
406    pub fn create_replicator(&self) -> crate::Result<()> {
407        Err(ErrorCode::E_NOTIMPL.into())
408    }
409
410    /// Reports load for the current replica in the partition.
411    /// Remarks:
412    /// The reported metrics should correspond to those that are provided in the ServiceLoadMetricDescription
413    /// as a part of the ServiceDescription that is used to create the service. Load metrics that are not
414    /// present in the description are ignored. Reporting custom metrics allows Service Fabric to balance
415    /// services that are based on additional custom information.
416    pub fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> {
417        let metrics_ref = LoadMetricListRef::from_slice(metrics);
418        let raw = metrics_ref.as_raw_slice();
419        unsafe { self.com_impl.ReportLoad(raw) }.map_err(crate::Error::from)
420    }
421
422    /// Enables the replica to report a fault to the runtime and indicates that it has encountered
423    /// an error from which it cannot recover and must either be restarted or removed.
424    pub fn report_fault(&self, fault_type: FaultType) -> crate::Result<()> {
425        unsafe { self.com_impl.ReportFault(fault_type.into()) }.map_err(crate::Error::from)
426    }
427
428    /// Reports the move cost for a replica.
429    /// Remarks:
430    /// Services can report move cost of a replica using this method.
431    /// While the Service Fabric Resource Balances searches for the best balance in the cluster,
432    /// it examines both load information and move cost of each replica.
433    /// Resource balances will prefer to move replicas with lower cost in order to achieve balance.
434    pub fn report_move_cost(&self, move_cost: MoveCost) -> crate::Result<()> {
435        unsafe { self.com_impl.ReportMoveCost(move_cost.into()) }.map_err(crate::Error::from)
436    }
437
438    // Remarks:
439    // The health information describes the report details, like the source ID, the property,
440    // the health state and other relevant details. The partition uses an internal health client
441    // to send the reports to the health store. The client optimizes messages to Health Manager
442    // by batching reports per a configured duration (Default: 30 seconds). If the report has high priority,
443    // you can specify send options to send it immediately.
444
445    /// Reports current partition health.
446    pub fn report_partition_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
447        let healthinfo_ref = &healthinfo.into();
448        unsafe { self.com_impl.ReportPartitionHealth(healthinfo_ref) }.map_err(crate::Error::from)
449    }
450
451    /// Reports health on the current stateful service replica of the partition.
452    pub fn report_replica_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
453        let healthinfo_ref = &healthinfo.into();
454        unsafe { self.com_impl.ReportReplicaHealth(healthinfo_ref) }.map_err(crate::Error::from)
455    }
456}
457
458impl From<&IFabricStatefulServicePartition3> for StatefulServicePartition {
459    fn from(e: &IFabricStatefulServicePartition3) -> Self {
460        StatefulServicePartition {
461            com_impl: e.clone(),
462        }
463    }
464}