mssf_core/runtime/
stateful_proxy.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6// stateful_proxy is a wrapper layer around com api,
7// making manipulating com simple.
8
9use std::ffi::c_void;
10
11use crate::{Interface, WString, runtime::executor::BoxedCancelToken, strings::StringResult};
12use mssf_com::FabricRuntime::{
13    IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum,
14    IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
15};
16
17use crate::{
18    error::ErrorCode,
19    sync::fabric_begin_end_proxy,
20    types::{
21        FaultType, HealthInformation, LoadMetric, LoadMetricListRef, MoveCost, ReplicaRole,
22        ServicePartitionAccessStatus, ServicePartitionInformation,
23    },
24};
25
26use super::stateful::{PrimaryReplicator, Replicator, StatefulServiceReplica};
27use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuorumMode};
28
29pub struct StatefulServiceReplicaProxy {
30    com_impl: IFabricStatefulServiceReplica,
31}
32
33impl StatefulServiceReplicaProxy {
34    pub fn new(com_impl: IFabricStatefulServiceReplica) -> StatefulServiceReplicaProxy {
35        StatefulServiceReplicaProxy { com_impl }
36    }
37}
38
39impl StatefulServiceReplica for StatefulServiceReplicaProxy {
40    #[cfg_attr(
41        feature = "tracing",
42        tracing::instrument(skip_all, level = "debug", err) // TODO: trace ret
43    )]
44    async fn open(
45        &self,
46        openmode: OpenMode,
47        partition: StatefulServicePartition,
48        cancellation_token: BoxedCancelToken,
49    ) -> crate::Result<impl PrimaryReplicator> {
50        let com1 = &self.com_impl;
51        let com2 = self.com_impl.clone();
52        let rx = fabric_begin_end_proxy(
53            move |callback| unsafe {
54                com1.BeginOpen(openmode.into(), partition.get_com(), callback)
55            },
56            move |ctx| unsafe { com2.EndOpen(ctx) },
57            Some(cancellation_token),
58        );
59        let rplctr = rx.await??;
60
61        // Check COM interface is implemented.
62        let catchup_specific_quorum = rplctr
63            .cast::<IFabricReplicatorCatchupSpecificQuorum>()
64            .is_ok();
65        assert!(
66            catchup_specific_quorum,
67            "mssf does not support replicator without catchup_specific_quorum interface"
68        );
69
70        // TODO: cast without clone will cause access violation on AddRef in SF runtime.
71        let p_rplctr: IFabricPrimaryReplicator = rplctr.clone().cast().unwrap(); // must work
72        // Replicator must impl primary replicator as well.
73
74        let res = PrimaryReplicatorProxy::new(p_rplctr);
75        Ok(res)
76    }
77
78    #[cfg_attr(
79        feature = "tracing",
80        tracing::instrument(skip_all, level = "debug", ret, err)
81    )]
82    async fn change_role(
83        &self,
84        newrole: ReplicaRole,
85        cancellation_token: BoxedCancelToken,
86    ) -> crate::Result<WString> {
87        // replica address
88        let com1 = &self.com_impl;
89        let com2 = self.com_impl.clone();
90        let rx = fabric_begin_end_proxy(
91            move |callback| unsafe { com1.BeginChangeRole((&newrole).into(), callback) },
92            move |ctx| unsafe { com2.EndChangeRole(ctx) },
93            Some(cancellation_token),
94        );
95        let addr = rx.await??;
96        Ok(StringResult::from(&addr).into_inner())
97    }
98
99    #[cfg_attr(
100        feature = "tracing",
101        tracing::instrument(skip_all, level = "debug", ret, err)
102    )]
103    async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
104        let com1 = &self.com_impl;
105        let com2 = self.com_impl.clone();
106        let rx = fabric_begin_end_proxy(
107            move |callback| unsafe { com1.BeginClose(callback) },
108            move |ctx| unsafe { com2.EndClose(ctx) },
109            Some(cancellation_token),
110        );
111        rx.await?.map_err(crate::Error::from)
112    }
113    #[cfg_attr(
114        feature = "tracing",
115        tracing::instrument(skip_all, level = "debug", ret)
116    )]
117    fn abort(&self) {
118        unsafe { self.com_impl.Abort() }
119    }
120}
121
122pub struct ReplicatorProxy {
123    com_impl: IFabricReplicator,
124}
125
126impl ReplicatorProxy {
127    fn new(com_impl: IFabricReplicator) -> ReplicatorProxy {
128        ReplicatorProxy { com_impl }
129    }
130}
131
132impl Replicator for ReplicatorProxy {
133    #[cfg_attr(
134        feature = "tracing",
135        tracing::instrument(skip_all, level = "debug", ret, err)
136    )]
137    async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
138        // replicator address
139        let com1 = &self.com_impl;
140        let com2 = self.com_impl.clone();
141        let rx = fabric_begin_end_proxy(
142            move |callback| unsafe { com1.BeginOpen(callback) },
143            move |ctx| unsafe { com2.EndOpen(ctx) },
144            Some(cancellation_token),
145        );
146        let addr = rx.await??;
147        Ok(StringResult::from(&addr).into_inner())
148    }
149    #[cfg_attr(
150        feature = "tracing",
151        tracing::instrument(skip_all, level = "debug", ret, err)
152    )]
153    async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
154        let com1 = &self.com_impl;
155        let com2 = self.com_impl.clone();
156        let rx = fabric_begin_end_proxy(
157            move |callback| unsafe { com1.BeginClose(callback) },
158            move |ctx| unsafe { com2.EndClose(ctx) },
159            Some(cancellation_token),
160        );
161        rx.await?.map_err(crate::Error::from)
162    }
163    #[cfg_attr(
164        feature = "tracing",
165        tracing::instrument(skip_all, level = "debug", ret, err)
166    )]
167    async fn change_role(
168        &self,
169        epoch: Epoch,
170        role: ReplicaRole,
171        cancellation_token: BoxedCancelToken,
172    ) -> crate::Result<()> {
173        let com1 = &self.com_impl;
174        let com2 = self.com_impl.clone();
175        let fabric_epoch: mssf_com::FabricTypes::FABRIC_EPOCH = (&epoch).into();
176        let rx = fabric_begin_end_proxy(
177            move |callback| unsafe {
178                com1.BeginChangeRole(&fabric_epoch, (&role).into(), callback)
179            },
180            move |ctx| unsafe { com2.EndChangeRole(ctx) },
181            Some(cancellation_token),
182        );
183        rx.await?.map_err(crate::Error::from)
184    }
185    #[cfg_attr(
186        feature = "tracing",
187        tracing::instrument(skip_all, level = "debug", ret, err)
188    )]
189    async fn update_epoch(
190        &self,
191        epoch: Epoch,
192        cancellation_token: BoxedCancelToken,
193    ) -> crate::Result<()> {
194        let com1 = &self.com_impl;
195        let com2 = self.com_impl.clone();
196        let fabric_epoch: mssf_com::FabricTypes::FABRIC_EPOCH = (&epoch).into();
197        let rx = fabric_begin_end_proxy(
198            move |callback| unsafe { com1.BeginUpdateEpoch(&fabric_epoch, callback) },
199            move |ctx| unsafe { com2.EndUpdateEpoch(ctx) },
200            Some(cancellation_token),
201        );
202        rx.await?.map_err(crate::Error::from)
203    }
204    #[cfg_attr(
205        feature = "tracing",
206        tracing::instrument(skip_all, level = "debug", ret, err)
207    )]
208    #[cfg_attr(
209        feature = "tracing",
210        tracing::instrument(skip_all, level = "debug", ret, err)
211    )]
212    fn get_current_progress(&self) -> crate::Result<i64> {
213        unsafe { self.com_impl.GetCurrentProgress() }.map_err(crate::Error::from)
214    }
215    #[cfg_attr(
216        feature = "tracing",
217        tracing::instrument(skip_all, level = "debug", ret, err)
218    )]
219    fn get_catch_up_capability(&self) -> crate::Result<i64> {
220        unsafe { self.com_impl.GetCatchUpCapability() }.map_err(crate::Error::from)
221    }
222    #[cfg_attr(
223        feature = "tracing",
224        tracing::instrument(skip_all, level = "debug", ret)
225    )]
226    fn abort(&self) {
227        unsafe { self.com_impl.Abort() }
228    }
229}
230
231pub struct PrimaryReplicatorProxy {
232    com_impl: IFabricPrimaryReplicator,
233    parent: ReplicatorProxy,
234}
235
236impl PrimaryReplicatorProxy {
237    pub fn new(com_impl: IFabricPrimaryReplicator) -> PrimaryReplicatorProxy {
238        let parent = ReplicatorProxy::new(com_impl.clone().cast().unwrap());
239        PrimaryReplicatorProxy { com_impl, parent }
240    }
241}
242
243impl Replicator for PrimaryReplicatorProxy {
244    async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
245        self.parent.open(cancellation_token).await
246    }
247    async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
248        self.parent.close(cancellation_token).await
249    }
250    async fn change_role(
251        &self,
252        epoch: Epoch,
253        role: ReplicaRole,
254        cancellation_token: BoxedCancelToken,
255    ) -> crate::Result<()> {
256        self.parent
257            .change_role(epoch, role, cancellation_token)
258            .await
259    }
260    async fn update_epoch(
261        &self,
262        epoch: Epoch,
263        cancellation_token: BoxedCancelToken,
264    ) -> crate::Result<()> {
265        self.parent.update_epoch(epoch, cancellation_token).await
266    }
267    fn get_current_progress(&self) -> crate::Result<i64> {
268        self.parent.get_current_progress()
269    }
270    fn get_catch_up_capability(&self) -> crate::Result<i64> {
271        self.parent.get_catch_up_capability()
272    }
273    fn abort(&self) {
274        self.parent.abort()
275    }
276}
277
278impl PrimaryReplicator for PrimaryReplicatorProxy {
279    #[cfg_attr(
280        feature = "tracing",
281        tracing::instrument(skip_all, level = "debug", ret, err)
282    )]
283    async fn on_data_loss(&self, cancellation_token: BoxedCancelToken) -> crate::Result<u8> {
284        let com1 = &self.com_impl;
285        let com2 = self.com_impl.clone();
286        let rx = fabric_begin_end_proxy(
287            move |callback| unsafe { com1.BeginOnDataLoss(callback) },
288            move |ctx| unsafe { com2.EndOnDataLoss(ctx) },
289            Some(cancellation_token),
290        );
291        rx.await?.map_err(crate::Error::from)
292    }
293    #[cfg_attr(
294        feature = "tracing",
295        tracing::instrument(skip_all, level = "debug", ret, err)
296    )]
297    fn update_catch_up_replica_set_configuration(
298        &self,
299        currentconfiguration: ReplicaSetConfig,
300        previousconfiguration: ReplicaSetConfig,
301    ) -> crate::Result<()> {
302        let cc_view = currentconfiguration.get_view();
303        let pc_view = previousconfiguration.get_view();
304        unsafe {
305            self.com_impl
306                .UpdateCatchUpReplicaSetConfiguration(cc_view.get_raw(), pc_view.get_raw())
307        }
308        .map_err(crate::Error::from)
309    }
310    #[cfg_attr(
311        feature = "tracing",
312        tracing::instrument(skip_all, level = "debug", ret, err)
313    )]
314    async fn wait_for_catch_up_quorum(
315        &self,
316        catchupmode: ReplicaSetQuorumMode,
317        cancellation_token: BoxedCancelToken,
318    ) -> crate::Result<()> {
319        let com1 = &self.com_impl;
320        let com2 = self.com_impl.clone();
321        let rx = fabric_begin_end_proxy(
322            move |callback| unsafe { com1.BeginWaitForCatchUpQuorum(catchupmode.into(), callback) },
323            move |ctx| unsafe { com2.EndWaitForCatchUpQuorum(ctx) },
324            Some(cancellation_token),
325        );
326        rx.await?.map_err(crate::Error::from)
327    }
328    #[cfg_attr(
329        feature = "tracing",
330        tracing::instrument(skip_all, level = "debug", ret, err)
331    )]
332    fn update_current_replica_set_configuration(
333        &self,
334        currentconfiguration: ReplicaSetConfig,
335    ) -> crate::Result<()> {
336        unsafe {
337            self.com_impl
338                .UpdateCurrentReplicaSetConfiguration(currentconfiguration.get_view().get_raw())
339        }
340        .map_err(crate::Error::from)
341    }
342    #[cfg_attr(
343        feature = "tracing",
344        tracing::instrument(skip_all, level = "debug", ret, err)
345    )]
346    async fn build_replica(
347        &self,
348        replica: ReplicaInformation,
349        cancellation_token: BoxedCancelToken,
350    ) -> crate::Result<()> {
351        let com1 = &self.com_impl;
352        let com2 = self.com_impl.clone();
353        let rx = fabric_begin_end_proxy(
354            move |callback| {
355                let (mut info, ex1) = replica.get_raw_parts();
356                info.Reserved = std::ptr::addr_of!(ex1) as *mut c_void;
357                unsafe { com1.BeginBuildReplica(&info, callback) }
358            },
359            move |ctx| unsafe { com2.EndBuildReplica(ctx) },
360            Some(cancellation_token),
361        );
362        rx.await?.map_err(crate::Error::from)
363    }
364    #[cfg_attr(
365        feature = "tracing",
366        tracing::instrument(skip_all, level = "debug", ret, err)
367    )]
368    fn remove_replica(&self, replicaid: i64) -> crate::Result<()> {
369        unsafe { self.com_impl.RemoveReplica(replicaid) }.map_err(crate::Error::from)
370    }
371}
372
373/// Proxy COM object IFabricStatefulServicePartition3
374#[derive(Debug, Clone)]
375pub struct StatefulServicePartition {
376    com_impl: IFabricStatefulServicePartition3,
377}
378
379impl StatefulServicePartition {
380    pub fn get_com(&self) -> &IFabricStatefulServicePartition3 {
381        &self.com_impl
382    }
383
384    /// Provides access to the ServicePartitionInformation of the service, which contains the partition type and ID.
385    pub fn get_partition_information(&self) -> crate::Result<ServicePartitionInformation> {
386        unsafe { self.com_impl.GetPartitionInfo()?.as_ref() }
387            .ok_or(ErrorCode::E_POINTER.into())
388            .map(ServicePartitionInformation::from)
389    }
390
391    /// Used to check the readiness of the replica in regard to read operations.
392    /// The ReadStatus should be checked before the replica is servicing a customer request that is a read operation.
393    pub fn get_read_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
394        unsafe { self.com_impl.GetReadStatus() }
395            .map(ServicePartitionAccessStatus::from)
396            .map_err(crate::Error::from)
397    }
398
399    /// Used to check the readiness of the partition in regard to write operations.
400    /// The WriteStatus should be checked before the replica services a customer request that is a write operation.
401    pub fn get_write_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
402        unsafe { self.com_impl.GetWriteStatus() }
403            .map(ServicePartitionAccessStatus::from)
404            .map_err(crate::Error::from)
405    }
406
407    /// TODO: not implemented
408    /// Creates a FabricReplicator with the specified settings and returns it to the replica.
409    pub fn create_replicator(&self) -> crate::Result<()> {
410        Err(ErrorCode::E_NOTIMPL.into())
411    }
412
413    /// Reports load for the current replica in the partition.
414    /// Remarks:
415    /// The reported metrics should correspond to those that are provided in the ServiceLoadMetricDescription
416    /// as a part of the ServiceDescription that is used to create the service. Load metrics that are not
417    /// present in the description are ignored. Reporting custom metrics allows Service Fabric to balance
418    /// services that are based on additional custom information.
419    pub fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> {
420        let metrics_ref = LoadMetricListRef::from_slice(metrics);
421        let raw = metrics_ref.as_raw_slice();
422        unsafe { self.com_impl.ReportLoad(raw) }.map_err(crate::Error::from)
423    }
424
425    /// Enables the replica to report a fault to the runtime and indicates that it has encountered
426    /// an error from which it cannot recover and must either be restarted or removed.
427    pub fn report_fault(&self, fault_type: FaultType) -> crate::Result<()> {
428        unsafe { self.com_impl.ReportFault(fault_type.into()) }.map_err(crate::Error::from)
429    }
430
431    /// Reports the move cost for a replica.
432    /// Remarks:
433    /// Services can report move cost of a replica using this method.
434    /// While the Service Fabric Resource Balances searches for the best balance in the cluster,
435    /// it examines both load information and move cost of each replica.
436    /// Resource balances will prefer to move replicas with lower cost in order to achieve balance.
437    pub fn report_move_cost(&self, move_cost: MoveCost) -> crate::Result<()> {
438        unsafe { self.com_impl.ReportMoveCost(move_cost.into()) }.map_err(crate::Error::from)
439    }
440
441    // Remarks:
442    // The health information describes the report details, like the source ID, the property,
443    // the health state and other relevant details. The partition uses an internal health client
444    // to send the reports to the health store. The client optimizes messages to Health Manager
445    // by batching reports per a configured duration (Default: 30 seconds). If the report has high priority,
446    // you can specify send options to send it immediately.
447
448    /// Reports current partition health.
449    pub fn report_partition_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
450        let healthinfo_ref = &healthinfo.into();
451        unsafe { self.com_impl.ReportPartitionHealth(healthinfo_ref) }.map_err(crate::Error::from)
452    }
453
454    /// Reports health on the current stateful service replica of the partition.
455    pub fn report_replica_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
456        let healthinfo_ref = &healthinfo.into();
457        unsafe { self.com_impl.ReportReplicaHealth(healthinfo_ref) }.map_err(crate::Error::from)
458    }
459}
460
461impl From<&IFabricStatefulServicePartition3> for StatefulServicePartition {
462    fn from(e: &IFabricStatefulServicePartition3) -> Self {
463        StatefulServicePartition {
464            com_impl: e.clone(),
465        }
466    }
467}