Skip to main content

mssf_util/
data.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6//! mssf data utilities and extensions
7//!
8
9use std::sync::Arc;
10
11use mssf_core::{
12    WString,
13    runtime::{
14        IPrimaryReplicator, IReplicator, IStatefulServicePartition, executor::BoxedCancelToken,
15    },
16    types::{
17        Epoch, ReplicaInformation, ReplicaRole, ReplicaSetConfig, ReplicaSetQuorumMode,
18        ServicePartitionAccessStatus,
19    },
20};
21
22/// An empty replicator that does nothing. Useful for services without
23/// replication needs.
24/// Traces are added for all methods for easier debugging.
25#[derive(Clone)]
26pub struct EmptyReplicator {
27    name: WString,
28    partition: Option<Arc<dyn IStatefulServicePartition>>,
29}
30
31impl EmptyReplicator {
32    /// Get read status for tracing.
33    fn read_status(&self) -> Option<ServicePartitionAccessStatus> {
34        self.partition
35            .as_ref()
36            .map(|p| p.get_read_status().ok())
37            .unwrap_or(None)
38    }
39
40    /// Get write status for tracing.
41    fn write_status(&self) -> Option<ServicePartitionAccessStatus> {
42        self.partition
43            .as_ref()
44            .map(|p| p.get_write_status().ok())
45            .unwrap_or(None)
46    }
47}
48
49/// Make it short for tracing purpose
50impl std::fmt::Debug for EmptyReplicator {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        write!(f, "EmptyReplicator-{}", self.name)
53    }
54}
55
56impl EmptyReplicator {
57    /// Create a new empty replicator with a name for tracing purpose.
58    pub fn new(
59        name: WString,
60        partition: Option<Arc<dyn IStatefulServicePartition>>,
61    ) -> EmptyReplicator {
62        EmptyReplicator { name, partition }
63    }
64}
65
66// This is basic implementation of Replicator
67#[mssf_core::async_trait]
68impl IReplicator for EmptyReplicator {
69    #[tracing::instrument(skip(_token), err, ret)]
70    async fn open(&self, _token: BoxedCancelToken) -> mssf_core::Result<WString> {
71        // Empty replicator does not listen on any address
72        Ok(WString::from("NoProtocol://localhost:0"))
73    }
74
75    #[tracing::instrument(skip(_token), err, ret)]
76    async fn close(&self, _token: BoxedCancelToken) -> mssf_core::Result<()> {
77        Ok(())
78    }
79
80    #[tracing::instrument(skip(_token), fields(read_status = ?self.read_status(), write_status = ?self.write_status()), err, ret)]
81    async fn change_role(
82        &self,
83        _epoch: Epoch,
84        _role: ReplicaRole,
85        _token: BoxedCancelToken,
86    ) -> mssf_core::Result<()> {
87        Ok(())
88    }
89
90    #[tracing::instrument(skip(_token), fields(read_status = ?self.read_status(), write_status = ?self.write_status()), err, ret)]
91    async fn update_epoch(&self, _epoch: Epoch, _token: BoxedCancelToken) -> mssf_core::Result<()> {
92        Ok(())
93    }
94
95    #[tracing::instrument(err, ret)]
96    fn get_current_progress(&self) -> mssf_core::Result<i64> {
97        Ok(1)
98    }
99
100    #[tracing::instrument(err, ret)]
101    fn get_catch_up_capability(&self) -> mssf_core::Result<i64> {
102        Ok(1)
103    }
104
105    #[tracing::instrument()]
106    fn abort(&self) {
107        tracing::info!("abort");
108    }
109}
110
111// This is basic implementation of PrimaryReplicator
112#[mssf_core::async_trait]
113impl IPrimaryReplicator for EmptyReplicator {
114    #[tracing::instrument(skip(_token), err, ret)]
115    async fn on_data_loss(&self, _token: BoxedCancelToken) -> mssf_core::Result<u8> {
116        Ok(0)
117    }
118
119    #[tracing::instrument(err, ret)]
120    fn update_catch_up_replica_set_configuration(
121        &self,
122        currentconfiguration: ReplicaSetConfig,
123        previousconfiguration: ReplicaSetConfig,
124    ) -> mssf_core::Result<()> {
125        Ok(())
126    }
127
128    #[tracing::instrument(skip(_token), fields(read_status = ?self.read_status(), write_status = ?self.write_status()), err, ret)]
129    async fn wait_for_catch_up_quorum(
130        &self,
131        _catchupmode: ReplicaSetQuorumMode,
132        _token: BoxedCancelToken,
133    ) -> mssf_core::Result<()> {
134        // Before demoting a primary to active secondary in graceful failover (MovePrimary api FabricClient trigger),
135        // (R:G, W:P) means read status granted, write status reconfiguration pending.
136        // NA means status NotPrimary.
137        // SF calls this in order:
138        // * update_catch_up_replica_set_configuration
139        // * wait_for_catch_up_quorum write mode, with (R:G, W:G).
140        //   app should catch up making necessary writes. (For example: complete transaction?)
141        //   This may take forever depends on the implementation, if write is faster than catch up.
142        //   App can ignore this call and let the next catch up call handle it all, if the app
143        //   does not need to do write while catching up.
144        // * update epoch,(R:G, W:P). SF revokes write status for the service.
145        // * update_catch_up_replica_set_configuration, with (R:G, W:P)
146        // * wait_for_catch_up_quorum, with (R:G, W:P).
147        //   app should catch up knowing that user/client is not able to write.
148        // * change_role from Primary to ActiveSecondary, with the same epoch from update epoch. (R:NA,W:NA)
149
150        // For newly created or promoted Primary, status starts with ChangeRole Primary (R:P, W:P)
151        // * update_catch_up_replica_set_configuration (R:P, W:P)
152        // * wait_for_catch_up_quorum (R:P, W:P)
153        // * update_current_replica_set_configuration (R:G, W:G)
154        Ok(())
155    }
156
157    #[tracing::instrument(err, ret)]
158    fn update_current_replica_set_configuration(
159        &self,
160        currentconfiguration: ReplicaSetConfig,
161    ) -> mssf_core::Result<()> {
162        Ok(())
163    }
164
165    #[tracing::instrument(skip(_token), err, ret)]
166    async fn build_replica(
167        &self,
168        _replica: ReplicaInformation,
169        _token: BoxedCancelToken,
170    ) -> mssf_core::Result<()> {
171        Ok(())
172    }
173
174    #[tracing::instrument(err, ret)]
175    fn remove_replica(&self, _replicaid: i64) -> mssf_core::Result<()> {
176        Ok(())
177    }
178}