mssf_util/
data.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6//! mssf data utilities and extensions
7//!
8
9use mssf_core::{
10    WString,
11    runtime::{
12        executor::BoxedCancelToken,
13        stateful::{PrimaryReplicator, Replicator},
14        stateful_proxy::StatefulServicePartition,
15    },
16    types::{
17        Epoch, ReplicaInformation, ReplicaRole, ReplicaSetConfig, ReplicaSetQuorumMode,
18        ServicePartitionAccessStatus,
19    },
20};
21
22/// An empty replicator that does nothing. Useful for services without
23/// replication needs.
24/// Traces are added for all methods for easier debugging.
25#[derive(Clone)]
26pub struct EmptyReplicator {
27    name: WString,
28    partition: Option<StatefulServicePartition>,
29}
30
31impl EmptyReplicator {
32    /// Get read status for tracing.
33    fn read_status(&self) -> Option<ServicePartitionAccessStatus> {
34        self.partition
35            .as_ref()
36            .map(|p| p.get_read_status().ok())
37            .unwrap_or(None)
38    }
39
40    /// Get write status for tracing.
41    fn write_status(&self) -> Option<ServicePartitionAccessStatus> {
42        self.partition
43            .as_ref()
44            .map(|p| p.get_write_status().ok())
45            .unwrap_or(None)
46    }
47}
48
49/// Make it short for tracing purpose
50impl std::fmt::Debug for EmptyReplicator {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        write!(f, "EmptyReplicator-{}", self.name)
53    }
54}
55
56impl EmptyReplicator {
57    /// Create a new empty replicator with a name for tracing purpose.
58    pub fn new(name: WString, partition: Option<StatefulServicePartition>) -> EmptyReplicator {
59        EmptyReplicator { name, partition }
60    }
61}
62
63// This is basic implementation of Replicator
64impl Replicator for EmptyReplicator {
65    #[tracing::instrument(err, ret)]
66    async fn open(&self, _: BoxedCancelToken) -> mssf_core::Result<WString> {
67        // Empty replicator does not listen on any address
68        Ok(WString::from("NoProtocol://localhost:0"))
69    }
70
71    #[tracing::instrument(err, ret)]
72    async fn close(&self, _: BoxedCancelToken) -> mssf_core::Result<()> {
73        Ok(())
74    }
75
76    #[tracing::instrument(fields(read_status = ?self.read_status(), write_status = ?self.write_status()), err, ret)]
77    async fn change_role(
78        &self,
79        epoch: &Epoch,
80        role: &ReplicaRole,
81        _: BoxedCancelToken,
82    ) -> mssf_core::Result<()> {
83        Ok(())
84    }
85
86    #[tracing::instrument(fields(read_status = ?self.read_status(), write_status = ?self.write_status()), err, ret)]
87    async fn update_epoch(&self, epoch: &Epoch, _: BoxedCancelToken) -> mssf_core::Result<()> {
88        Ok(())
89    }
90
91    #[tracing::instrument(err, ret)]
92    fn get_current_progress(&self) -> mssf_core::Result<i64> {
93        Ok(1)
94    }
95
96    #[tracing::instrument(err, ret)]
97    fn get_catch_up_capability(&self) -> mssf_core::Result<i64> {
98        Ok(1)
99    }
100
101    #[tracing::instrument(skip(self))]
102    fn abort(&self) {
103        tracing::info!("abort");
104    }
105}
106
107// This is basic implementation of PrimaryReplicator
108impl PrimaryReplicator for EmptyReplicator {
109    async fn on_data_loss(&self, _: BoxedCancelToken) -> mssf_core::Result<u8> {
110        Ok(0)
111    }
112
113    #[tracing::instrument(err, ret)]
114    fn update_catch_up_replica_set_configuration(
115        &self,
116        currentconfiguration: &ReplicaSetConfig,
117        previousconfiguration: &ReplicaSetConfig,
118    ) -> mssf_core::Result<()> {
119        Ok(())
120    }
121
122    #[tracing::instrument(fields(read_status = ?self.read_status(), write_status = ?self.write_status()), err, ret)]
123    async fn wait_for_catch_up_quorum(
124        &self,
125        catchupmode: ReplicaSetQuorumMode,
126        _: BoxedCancelToken,
127    ) -> mssf_core::Result<()> {
128        // Before demoting a primary to active secondary in graceful failover (MovePrimary api FabricClient trigger),
129        // (R:G, W:P) means read status granted, write status reconfiguration pending.
130        // NA means status NotPrimary.
131        // SF calls this in order:
132        // * update_catch_up_replica_set_configuration
133        // * wait_for_catch_up_quorum write mode, with (R:G, W:G).
134        //   app should catch up making necessary writes. (For example: complete transaction?)
135        //   This may take forever depends on the implementation, if write is faster than catch up.
136        //   App can ignore this call and let the next catch up call handle it all, if the app
137        //   does not need to do write while catching up.
138        // * update epoch,(R:G, W:P). SF revokes write status for the service.
139        // * update_catch_up_replica_set_configuration, with (R:G, W:P)
140        // * wait_for_catch_up_quorum, with (R:G, W:P).
141        //   app should catch up knowing that user/client is not able to write.
142        // * change_role from Primary to ActiveSecondary, with the same epoch from update epoch. (R:NA,W:NA)
143
144        // For newly created or promoted Primary, status starts with ChangeRole Primary (R:P, W:P)
145        // * update_catch_up_replica_set_configuration (R:P, W:P)
146        // * wait_for_catch_up_quorum (R:P, W:P)
147        // * update_current_replica_set_configuration (R:G, W:G)
148        Ok(())
149    }
150
151    #[tracing::instrument(err, ret)]
152    fn update_current_replica_set_configuration(
153        &self,
154        currentconfiguration: &ReplicaSetConfig,
155    ) -> mssf_core::Result<()> {
156        Ok(())
157    }
158
159    #[tracing::instrument(err, ret)]
160    async fn build_replica(
161        &self,
162        replica: &ReplicaInformation,
163        _: BoxedCancelToken,
164    ) -> mssf_core::Result<()> {
165        Ok(())
166    }
167
168    #[tracing::instrument(err, ret)]
169    fn remove_replica(&self, _replicaid: i64) -> mssf_core::Result<()> {
170        Ok(())
171    }
172}