Skip to main content

mssf_core/runtime/
stateful_proxy.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6// stateful_proxy is a wrapper layer around com api,
7// making manipulating com simple.
8
9use std::{ffi::c_void, sync::Arc};
10
11use crate::{Interface, WString, runtime::executor::BoxedCancelToken, strings::StringResult};
12use mssf_com::FabricRuntime::{
13    IFabricPrimaryReplicator, IFabricReplicator, IFabricReplicatorCatchupSpecificQuorum,
14    IFabricStatefulServicePartition3, IFabricStatefulServiceReplica,
15};
16
17use crate::{
18    error::ErrorCode,
19    sync::fabric_begin_end_proxy,
20    types::{
21        FaultType, HealthInformation, LoadMetric, LoadMetricListRef, MoveCost, ReplicaRole,
22        ServicePartitionAccessStatus, ServicePartitionInformation,
23    },
24};
25
26use super::{IPrimaryReplicator, IReplicator, IStatefulServiceReplica};
27use crate::types::{Epoch, OpenMode, ReplicaInformation, ReplicaSetConfig, ReplicaSetQuorumMode};
28
29pub struct StatefulServiceReplicaProxy {
30    com_impl: IFabricStatefulServiceReplica,
31}
32
33impl StatefulServiceReplicaProxy {
34    pub fn new(com_impl: IFabricStatefulServiceReplica) -> StatefulServiceReplicaProxy {
35        StatefulServiceReplicaProxy { com_impl }
36    }
37}
38
39#[async_trait::async_trait]
40impl IStatefulServiceReplica for StatefulServiceReplicaProxy {
41    #[cfg_attr(
42        feature = "tracing",
43        tracing::instrument(skip_all, level = "debug", err) // TODO: trace ret
44    )]
45    async fn open(
46        &self,
47        openmode: OpenMode,
48        partition: Arc<dyn super::IStatefulServicePartition>,
49        cancellation_token: BoxedCancelToken,
50    ) -> crate::Result<Box<dyn IPrimaryReplicator>> {
51        let com1 = &self.com_impl;
52        let com2 = self.com_impl.clone();
53        let rx = fabric_begin_end_proxy(
54            move |callback| unsafe {
55                com1.BeginOpen(
56                    openmode.into(),
57                    partition.try_get_com().expect("must support com"),
58                    callback,
59                )
60            },
61            move |ctx| unsafe { com2.EndOpen(ctx) },
62            Some(cancellation_token),
63        );
64        let rplctr = rx.await??;
65
66        // Check COM interface is implemented.
67        let catchup_specific_quorum = rplctr
68            .cast::<IFabricReplicatorCatchupSpecificQuorum>()
69            .is_ok();
70        assert!(
71            catchup_specific_quorum,
72            "mssf does not support replicator without catchup_specific_quorum interface"
73        );
74
75        // TODO: cast without clone will cause access violation on AddRef in SF runtime.
76        let p_rplctr: IFabricPrimaryReplicator = rplctr.clone().cast().unwrap(); // must work
77        // Replicator must impl primary replicator as well.
78
79        let res = Box::new(PrimaryReplicatorProxy::new(p_rplctr));
80        Ok(res)
81    }
82
83    #[cfg_attr(
84        feature = "tracing",
85        tracing::instrument(skip_all, level = "debug", ret, err)
86    )]
87    async fn change_role(
88        &self,
89        newrole: ReplicaRole,
90        cancellation_token: BoxedCancelToken,
91    ) -> crate::Result<WString> {
92        // replica address
93        let com1 = &self.com_impl;
94        let com2 = self.com_impl.clone();
95        let rx = fabric_begin_end_proxy(
96            move |callback| unsafe { com1.BeginChangeRole((&newrole).into(), callback) },
97            move |ctx| unsafe { com2.EndChangeRole(ctx) },
98            Some(cancellation_token),
99        );
100        let addr = rx.await??;
101        Ok(StringResult::from(&addr).into_inner())
102    }
103
104    #[cfg_attr(
105        feature = "tracing",
106        tracing::instrument(skip_all, level = "debug", ret, err)
107    )]
108    async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
109        let com1 = &self.com_impl;
110        let com2 = self.com_impl.clone();
111        let rx = fabric_begin_end_proxy(
112            move |callback| unsafe { com1.BeginClose(callback) },
113            move |ctx| unsafe { com2.EndClose(ctx) },
114            Some(cancellation_token),
115        );
116        rx.await?.map_err(crate::Error::from)
117    }
118    #[cfg_attr(
119        feature = "tracing",
120        tracing::instrument(skip_all, level = "debug", ret)
121    )]
122    fn abort(&self) {
123        unsafe { self.com_impl.Abort() }
124    }
125}
126
127pub struct ReplicatorProxy {
128    com_impl: IFabricReplicator,
129}
130
131impl ReplicatorProxy {
132    fn new(com_impl: IFabricReplicator) -> ReplicatorProxy {
133        ReplicatorProxy { com_impl }
134    }
135}
136
137#[async_trait::async_trait]
138impl IReplicator for ReplicatorProxy {
139    #[cfg_attr(
140        feature = "tracing",
141        tracing::instrument(skip_all, level = "debug", ret, err)
142    )]
143    async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
144        // replicator address
145        let com1 = &self.com_impl;
146        let com2 = self.com_impl.clone();
147        let rx = fabric_begin_end_proxy(
148            move |callback| unsafe { com1.BeginOpen(callback) },
149            move |ctx| unsafe { com2.EndOpen(ctx) },
150            Some(cancellation_token),
151        );
152        let addr = rx.await??;
153        Ok(StringResult::from(&addr).into_inner())
154    }
155    #[cfg_attr(
156        feature = "tracing",
157        tracing::instrument(skip_all, level = "debug", ret, err)
158    )]
159    async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
160        let com1 = &self.com_impl;
161        let com2 = self.com_impl.clone();
162        let rx = fabric_begin_end_proxy(
163            move |callback| unsafe { com1.BeginClose(callback) },
164            move |ctx| unsafe { com2.EndClose(ctx) },
165            Some(cancellation_token),
166        );
167        rx.await?.map_err(crate::Error::from)
168    }
169    #[cfg_attr(
170        feature = "tracing",
171        tracing::instrument(skip_all, level = "debug", ret, err)
172    )]
173    async fn change_role(
174        &self,
175        epoch: Epoch,
176        role: ReplicaRole,
177        cancellation_token: BoxedCancelToken,
178    ) -> crate::Result<()> {
179        let com1 = &self.com_impl;
180        let com2 = self.com_impl.clone();
181        let fabric_epoch: mssf_com::FabricTypes::FABRIC_EPOCH = (&epoch).into();
182        let rx = fabric_begin_end_proxy(
183            move |callback| unsafe {
184                com1.BeginChangeRole(&fabric_epoch, (&role).into(), callback)
185            },
186            move |ctx| unsafe { com2.EndChangeRole(ctx) },
187            Some(cancellation_token),
188        );
189        rx.await?.map_err(crate::Error::from)
190    }
191    #[cfg_attr(
192        feature = "tracing",
193        tracing::instrument(skip_all, level = "debug", ret, err)
194    )]
195    async fn update_epoch(
196        &self,
197        epoch: Epoch,
198        cancellation_token: BoxedCancelToken,
199    ) -> crate::Result<()> {
200        let com1 = &self.com_impl;
201        let com2 = self.com_impl.clone();
202        let fabric_epoch: mssf_com::FabricTypes::FABRIC_EPOCH = (&epoch).into();
203        let rx = fabric_begin_end_proxy(
204            move |callback| unsafe { com1.BeginUpdateEpoch(&fabric_epoch, callback) },
205            move |ctx| unsafe { com2.EndUpdateEpoch(ctx) },
206            Some(cancellation_token),
207        );
208        rx.await?.map_err(crate::Error::from)
209    }
210    #[cfg_attr(
211        feature = "tracing",
212        tracing::instrument(skip_all, level = "debug", ret, err)
213    )]
214    #[cfg_attr(
215        feature = "tracing",
216        tracing::instrument(skip_all, level = "debug", ret, err)
217    )]
218    fn get_current_progress(&self) -> crate::Result<i64> {
219        unsafe { self.com_impl.GetCurrentProgress() }.map_err(crate::Error::from)
220    }
221    #[cfg_attr(
222        feature = "tracing",
223        tracing::instrument(skip_all, level = "debug", ret, err)
224    )]
225    fn get_catch_up_capability(&self) -> crate::Result<i64> {
226        unsafe { self.com_impl.GetCatchUpCapability() }.map_err(crate::Error::from)
227    }
228    #[cfg_attr(
229        feature = "tracing",
230        tracing::instrument(skip_all, level = "debug", ret)
231    )]
232    fn abort(&self) {
233        unsafe { self.com_impl.Abort() }
234    }
235}
236
237pub struct PrimaryReplicatorProxy {
238    com_impl: IFabricPrimaryReplicator,
239    parent: ReplicatorProxy,
240}
241
242impl PrimaryReplicatorProxy {
243    pub fn new(com_impl: IFabricPrimaryReplicator) -> PrimaryReplicatorProxy {
244        let parent = ReplicatorProxy::new(com_impl.clone().cast().unwrap());
245        PrimaryReplicatorProxy { com_impl, parent }
246    }
247}
248
249#[async_trait::async_trait]
250impl IReplicator for PrimaryReplicatorProxy {
251    async fn open(&self, cancellation_token: BoxedCancelToken) -> crate::Result<WString> {
252        self.parent.open(cancellation_token).await
253    }
254    async fn close(&self, cancellation_token: BoxedCancelToken) -> crate::Result<()> {
255        self.parent.close(cancellation_token).await
256    }
257    async fn change_role(
258        &self,
259        epoch: Epoch,
260        role: ReplicaRole,
261        cancellation_token: BoxedCancelToken,
262    ) -> crate::Result<()> {
263        self.parent
264            .change_role(epoch, role, cancellation_token)
265            .await
266    }
267    async fn update_epoch(
268        &self,
269        epoch: Epoch,
270        cancellation_token: BoxedCancelToken,
271    ) -> crate::Result<()> {
272        self.parent.update_epoch(epoch, cancellation_token).await
273    }
274    fn get_current_progress(&self) -> crate::Result<i64> {
275        self.parent.get_current_progress()
276    }
277    fn get_catch_up_capability(&self) -> crate::Result<i64> {
278        self.parent.get_catch_up_capability()
279    }
280    fn abort(&self) {
281        self.parent.abort()
282    }
283}
284
285#[async_trait::async_trait]
286impl IPrimaryReplicator for PrimaryReplicatorProxy {
287    #[cfg_attr(
288        feature = "tracing",
289        tracing::instrument(skip_all, level = "debug", ret, err)
290    )]
291    async fn on_data_loss(&self, cancellation_token: BoxedCancelToken) -> crate::Result<u8> {
292        let com1 = &self.com_impl;
293        let com2 = self.com_impl.clone();
294        let rx = fabric_begin_end_proxy(
295            move |callback| unsafe { com1.BeginOnDataLoss(callback) },
296            move |ctx| unsafe { com2.EndOnDataLoss(ctx) },
297            Some(cancellation_token),
298        );
299        rx.await?.map_err(crate::Error::from)
300    }
301    #[cfg_attr(
302        feature = "tracing",
303        tracing::instrument(skip_all, level = "debug", ret, err)
304    )]
305    fn update_catch_up_replica_set_configuration(
306        &self,
307        currentconfiguration: ReplicaSetConfig,
308        previousconfiguration: ReplicaSetConfig,
309    ) -> crate::Result<()> {
310        let cc_view = currentconfiguration.get_view();
311        let pc_view = previousconfiguration.get_view();
312        unsafe {
313            self.com_impl
314                .UpdateCatchUpReplicaSetConfiguration(cc_view.get_raw(), pc_view.get_raw())
315        }
316        .map_err(crate::Error::from)
317    }
318    #[cfg_attr(
319        feature = "tracing",
320        tracing::instrument(skip_all, level = "debug", ret, err)
321    )]
322    async fn wait_for_catch_up_quorum(
323        &self,
324        catchupmode: ReplicaSetQuorumMode,
325        cancellation_token: BoxedCancelToken,
326    ) -> crate::Result<()> {
327        let com1 = &self.com_impl;
328        let com2 = self.com_impl.clone();
329        let rx = fabric_begin_end_proxy(
330            move |callback| unsafe { com1.BeginWaitForCatchUpQuorum(catchupmode.into(), callback) },
331            move |ctx| unsafe { com2.EndWaitForCatchUpQuorum(ctx) },
332            Some(cancellation_token),
333        );
334        rx.await?.map_err(crate::Error::from)
335    }
336    #[cfg_attr(
337        feature = "tracing",
338        tracing::instrument(skip_all, level = "debug", ret, err)
339    )]
340    fn update_current_replica_set_configuration(
341        &self,
342        currentconfiguration: ReplicaSetConfig,
343    ) -> crate::Result<()> {
344        unsafe {
345            self.com_impl
346                .UpdateCurrentReplicaSetConfiguration(currentconfiguration.get_view().get_raw())
347        }
348        .map_err(crate::Error::from)
349    }
350    #[cfg_attr(
351        feature = "tracing",
352        tracing::instrument(skip_all, level = "debug", ret, err)
353    )]
354    async fn build_replica(
355        &self,
356        replica: ReplicaInformation,
357        cancellation_token: BoxedCancelToken,
358    ) -> crate::Result<()> {
359        let com1 = &self.com_impl;
360        let com2 = self.com_impl.clone();
361        let rx = fabric_begin_end_proxy(
362            move |callback| {
363                let (mut info, ex1) = replica.get_raw_parts();
364                info.Reserved = std::ptr::addr_of!(ex1) as *mut c_void;
365                unsafe { com1.BeginBuildReplica(&info, callback) }
366            },
367            move |ctx| unsafe { com2.EndBuildReplica(ctx) },
368            Some(cancellation_token),
369        );
370        rx.await?.map_err(crate::Error::from)
371    }
372    #[cfg_attr(
373        feature = "tracing",
374        tracing::instrument(skip_all, level = "debug", ret, err)
375    )]
376    fn remove_replica(&self, replicaid: i64) -> crate::Result<()> {
377        unsafe { self.com_impl.RemoveReplica(replicaid) }.map_err(crate::Error::from)
378    }
379}
380
381/// Proxy COM object IFabricStatefulServicePartition3
382#[derive(Debug, Clone)]
383pub struct StatefulServicePartition {
384    com_impl: IFabricStatefulServicePartition3,
385}
386
387impl super::IStatefulServicePartition for StatefulServicePartition {
388    fn get_partition_information(&self) -> crate::Result<ServicePartitionInformation> {
389        unsafe { self.com_impl.GetPartitionInfo()?.as_ref() }
390            .ok_or(ErrorCode::E_POINTER.into())
391            .map(ServicePartitionInformation::from)
392    }
393
394    fn get_read_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
395        unsafe { self.com_impl.GetReadStatus() }
396            .map(ServicePartitionAccessStatus::from)
397            .map_err(crate::Error::from)
398    }
399
400    fn get_write_status(&self) -> crate::Result<ServicePartitionAccessStatus> {
401        unsafe { self.com_impl.GetWriteStatus() }
402            .map(ServicePartitionAccessStatus::from)
403            .map_err(crate::Error::from)
404    }
405
406    /// TODO: not implemented
407    fn create_replicator(&self) -> crate::Result<Box<dyn IPrimaryReplicator>> {
408        Err(ErrorCode::E_NOTIMPL.into())
409    }
410
411    fn report_load(&self, metrics: &[LoadMetric]) -> crate::Result<()> {
412        let metrics_ref = LoadMetricListRef::from_slice(metrics);
413        let raw = metrics_ref.as_raw_slice();
414        unsafe { self.com_impl.ReportLoad(raw) }.map_err(crate::Error::from)
415    }
416
417    fn report_fault(&self, fault_type: FaultType) -> crate::Result<()> {
418        unsafe { self.com_impl.ReportFault(fault_type.into()) }.map_err(crate::Error::from)
419    }
420
421    fn report_move_cost(&self, move_cost: MoveCost) -> crate::Result<()> {
422        unsafe { self.com_impl.ReportMoveCost(move_cost.into()) }.map_err(crate::Error::from)
423    }
424
425    fn report_partition_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
426        let healthinfo_ref = &healthinfo.into();
427        unsafe { self.com_impl.ReportPartitionHealth(healthinfo_ref) }.map_err(crate::Error::from)
428    }
429
430    fn report_replica_health(&self, healthinfo: &HealthInformation) -> crate::Result<()> {
431        let healthinfo_ref = &healthinfo.into();
432        unsafe { self.com_impl.ReportReplicaHealth(healthinfo_ref) }.map_err(crate::Error::from)
433    }
434
435    fn try_get_com(
436        &self,
437    ) -> crate::Result<&mssf_com::FabricRuntime::IFabricStatefulServicePartition> {
438        Ok(&self.com_impl)
439    }
440}
441
442impl From<&IFabricStatefulServicePartition3> for StatefulServicePartition {
443    fn from(e: &IFabricStatefulServicePartition3) -> Self {
444        StatefulServicePartition {
445            com_impl: e.clone(),
446        }
447    }
448}