Skip to main content

mssf_util/mock/
stateful.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{
7    collections::{BTreeMap, HashMap},
8    sync::{Arc, Mutex},
9};
10
11use mssf_core::{
12    GUID, WString,
13    runtime::IStatefulServicePartition,
14    sync::SimpleCancelToken,
15    types::{Epoch, ServicePartitionInformation, Uri},
16};
17
18#[derive(Clone)]
19pub struct StatefulServicePartitionMock {
20    info: mssf_core::types::ServicePartitionInformation,
21    read_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
22    write_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
23}
24
25impl StatefulServicePartitionMock {
26    pub fn new(info: mssf_core::types::ServicePartitionInformation) -> Self {
27        Self {
28            info,
29            read_status: Arc::new(Mutex::new(
30                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
31            )),
32            write_status: Arc::new(Mutex::new(
33                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
34            )),
35        }
36    }
37    pub fn new_boxed(
38        info: mssf_core::types::ServicePartitionInformation,
39    ) -> Box<dyn IStatefulServicePartition> {
40        Box::new(Self::new(info))
41    }
42    pub fn set_read_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
43        *self.read_status.lock().unwrap() = status;
44    }
45    pub fn set_write_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
46        *self.write_status.lock().unwrap() = status;
47    }
48}
49
50impl IStatefulServicePartition for StatefulServicePartitionMock {
51    fn create_replicator(
52        &self,
53    ) -> mssf_core::Result<Box<dyn mssf_core::runtime::IPrimaryReplicator>> {
54        unimplemented!("Not implemented")
55    }
56
57    fn get_partition_information(
58        &self,
59    ) -> mssf_core::Result<mssf_core::types::ServicePartitionInformation> {
60        Ok(self.info.clone())
61    }
62
63    fn get_read_status(&self) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
64        Ok(*self.read_status.lock().unwrap())
65    }
66
67    fn get_write_status(
68        &self,
69    ) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
70        Ok(*self.write_status.lock().unwrap())
71    }
72
73    fn report_load(&self, _metrics: &[mssf_core::types::LoadMetric]) -> mssf_core::Result<()> {
74        Ok(())
75    }
76
77    fn report_fault(&self, _fault_type: mssf_core::types::FaultType) -> mssf_core::Result<()> {
78        Ok(())
79    }
80
81    fn report_move_cost(&self, _move_cost: mssf_core::types::MoveCost) -> mssf_core::Result<()> {
82        Ok(())
83    }
84
85    fn report_partition_health(
86        &self,
87        _healthinfo: &mssf_core::types::HealthInformation,
88    ) -> mssf_core::Result<()> {
89        Ok(())
90    }
91
92    fn report_replica_health(
93        &self,
94        _healthinfo: &mssf_core::types::HealthInformation,
95    ) -> mssf_core::Result<()> {
96        Ok(())
97    }
98
99    fn try_get_com(
100        &self,
101    ) -> mssf_core::Result<&mssf_com::FabricRuntime::IFabricStatefulServicePartition> {
102        Err(mssf_core::ErrorCode::FABRIC_E_OPERATION_NOT_SUPPORTED.into())
103    }
104}
105
106#[derive(Clone)]
107pub struct CreateStatefulServicePartitionArg {
108    pub partition_id: GUID,
109    pub replica_count: usize,
110    pub init_data: Vec<u8>,
111    pub service_name: Uri,
112    pub service_type_name: WString,
113}
114
115/// Test driver for a single stateful service replica.
116pub struct StatefulServicePartitionDriver {
117    /// This keeps track of which factory to use next.
118    factory_index: i64,
119    service_factory: Vec<Box<dyn mssf_core::runtime::IStatefulServiceFactory>>,
120    replica_index: i64,
121    epoch_index: Epoch, // Used to generate new epoch.
122    partition_state: PartitionState,
123}
124
125struct PartitionState {
126    // states for all replicas.
127    pub replica_states: HashMap<i64, StatefulServiceReplicaState>,
128    pub primary_index: i64,
129    pub epoch: Epoch,
130    pub static_info: Option<CreateStatefulServicePartitionArg>, // Filled when created.
131    // Write quorum and secondary replica list.
132    pub current_configuration: mssf_core::types::ReplicaSetConfig,
133}
134
135struct StatefulServiceReplicaState {
136    pub replica: Box<dyn mssf_core::runtime::IStatefulServiceReplica>,
137    pub replicator: Box<dyn mssf_core::runtime::IPrimaryReplicator>,
138    pub partition: StatefulServicePartitionMock,
139    pub factory_index: i64, // The index of the factory that created the replica
140    pub _replica_address: WString,
141    pub _replicator_address: WString,
142}
143
144impl Default for StatefulServicePartitionDriver {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150impl StatefulServicePartitionDriver {
151    pub fn new() -> Self {
152        Self {
153            service_factory: Vec::new(),
154            factory_index: 0,
155            replica_index: 1, // replica id starting from 1
156            epoch_index: Epoch {
157                data_loss_number: 0,
158                configuration_number: 1,
159            },
160            partition_state: PartitionState {
161                replica_states: HashMap::new(),
162                primary_index: 1, // First replica is the primary.
163                epoch: Epoch {
164                    data_loss_number: 0,
165                    configuration_number: 1,
166                },
167                static_info: None,
168                current_configuration: mssf_core::types::ReplicaSetConfig {
169                    replicas: vec![],
170                    write_quorum: 0,
171                },
172            },
173        }
174    }
175
176    /// Register a service factory to be used to create replicas.
177    /// One should register multiple factories to simulate multi node scenarios.
178    /// Replicas are created in round robin fashion from the registered factories.
179    pub fn register_service_factory(
180        &mut self,
181        factory: Box<dyn mssf_core::runtime::IStatefulServiceFactory>,
182    ) {
183        self.service_factory.push(factory);
184    }
185
186    /// Get the next service factory in round robin fashion.
187    /// This ensures that multiple factories can be tested, to simulate
188    /// multi node scenarios.
189    /// Returns the current index and the factory.
190    fn get_round_robin_factory(
191        &mut self,
192    ) -> (i64, &dyn mssf_core::runtime::IStatefulServiceFactory) {
193        assert!(!self.service_factory.is_empty());
194        let idx = self.factory_index as usize % self.service_factory.len();
195        self.factory_index += 1;
196        (idx as i64, &*self.service_factory[idx])
197    }
198
199    fn next_replica_index(&mut self) -> i64 {
200        let idx = self.replica_index;
201        self.replica_index += 1;
202        idx
203    }
204
205    fn next_epoch_index(&mut self) -> Epoch {
206        let idx = self.epoch_index.clone();
207        self.epoch_index.configuration_number += 1;
208        idx
209    }
210
211    fn get_primary_state(&self) -> mssf_core::Result<&StatefulServiceReplicaState> {
212        let state = self
213            .partition_state
214            .replica_states
215            .get(&self.partition_state.primary_index)
216            .ok_or_else(|| {
217                mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
218            })?;
219        Ok(state)
220    }
221
222    /// Check the invariants of the partition state.
223    /// Panics if any invariant is violated.
224    fn check_partition_state(&self) {
225        if self.partition_state.replica_states.is_empty() {
226            assert!(self.partition_state.static_info.is_none());
227            assert_eq!(self.partition_state.current_configuration.replicas.len(), 0);
228            assert_eq!(self.partition_state.current_configuration.write_quorum, 0);
229            return;
230        }
231        // check primary exists
232        self.get_primary_state().unwrap();
233        // check quorum size matches
234        let expected_quorum = (self.partition_state.replica_states.len() as u32) / 2 + 1;
235        assert_eq!(
236            self.partition_state.current_configuration.write_quorum,
237            expected_quorum
238        );
239    }
240}
241
242// Public Accessors
243impl StatefulServicePartitionDriver {
244    /// Get the current primary replica id.
245    pub fn get_primary_replica_id(&self) -> i64 {
246        self.partition_state.primary_index
247    }
248    /// Get a replica by id.
249    pub fn get_replica(
250        &self,
251        replica_id: i64,
252    ) -> Option<&dyn mssf_core::runtime::IStatefulServiceReplica> {
253        let state = self.partition_state.replica_states.get(&replica_id);
254        state.map(|s| s.replica.as_ref())
255    }
256    /// Get a replicator by replica id.
257    pub fn get_replicator(
258        &self,
259        replica_id: i64,
260    ) -> Option<&dyn mssf_core::runtime::IPrimaryReplicator> {
261        let state = self.partition_state.replica_states.get(&replica_id);
262        state.map(|s| s.replicator.as_ref())
263    }
264    /// List all replica ids.
265    pub fn list_replica_ids(&self) -> Vec<i64> {
266        self.partition_state
267            .replica_states
268            .keys()
269            .cloned()
270            .collect()
271    }
272}
273
274// Workflow implementations.
275impl StatefulServicePartitionDriver {
276    /// Create a stateful service partition with the specified number of replicas.
277    /// The first replica is the primary.
278    /// Runs the replica build steps.
279    pub async fn create_service_partition(
280        &mut self,
281        desc: &CreateStatefulServicePartitionArg,
282    ) -> mssf_core::Result<()> {
283        assert!(desc.replica_count > 0);
284        assert!(self.partition_state.replica_states.is_empty());
285
286        let mut replicas = BTreeMap::new();
287        let mut replicators = BTreeMap::new();
288        let mut replica_addresses = BTreeMap::new();
289        let mut replicator_addresses = BTreeMap::new();
290        let mut replica_infos = BTreeMap::new();
291        let mut partitions = BTreeMap::new();
292
293        for _ in 0..desc.replica_count {
294            let id = self.next_replica_index();
295            let (factory_index, factory) = self.get_round_robin_factory();
296            let replica = factory
297                .create_replica(
298                    desc.service_type_name.clone(),
299                    desc.service_name.clone(),
300                    &desc.init_data,
301                    desc.partition_id,
302                    id,
303                )
304                .inspect_err(|e| {
305                    tracing::error!("Failed to create stateful service replica: {:?}", e)
306                })?;
307            let prev = replicas.insert(id, (factory_index, replica));
308            assert!(prev.is_none(), "Service replica already exists");
309        }
310
311        // open all replicas
312        for (id, (_, replica)) in &replicas {
313            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
314            // TODO: support other partition schemes.
315            let partition =
316                StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
317                    mssf_core::types::SingletonPartitionInformation {
318                        id: desc.partition_id,
319                    },
320                ));
321            let replctr = replica
322                .open(
323                    mssf_core::types::OpenMode::New,
324                    Arc::new(partition.clone()),
325                    cancellation_token,
326                )
327                .await?;
328            replicators.insert(*id, replctr);
329            partitions.insert(*id, partition);
330        }
331
332        // open all replicators
333        for (id, replctr) in &replicators {
334            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
335
336            let replctr_addr = replctr.open(cancellation_token).await?;
337            replicator_addresses.insert(*id, replctr_addr);
338        }
339
340        // assign roles to replicators. for simplicity, we assume the first replica is the primary.
341        let primary_index = 1;
342        let epoch = self.next_epoch_index();
343        for (id, rplctr) in &replicators {
344            let epoch_cp = epoch.clone();
345            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
346            if *id == primary_index {
347                rplctr
348                    .change_role(
349                        epoch_cp,
350                        mssf_core::types::ReplicaRole::Primary,
351                        cancellation_token,
352                    )
353                    .await?;
354                self.partition_state.primary_index = primary_index;
355            } else {
356                rplctr
357                    .change_role(
358                        epoch_cp,
359                        mssf_core::types::ReplicaRole::IdleSecondary,
360                        cancellation_token,
361                    )
362                    .await?;
363            }
364        }
365
366        // assign roles to replicas. First one is primary.
367        for (id, (_, replica)) in &replicas {
368            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
369            let replica_addr = if *id == self.partition_state.primary_index {
370                replica
371                    .change_role(mssf_core::types::ReplicaRole::Primary, cancellation_token)
372                    .await?
373            } else {
374                replica
375                    .change_role(
376                        mssf_core::types::ReplicaRole::IdleSecondary,
377                        cancellation_token,
378                    )
379                    .await?
380            };
381            replica_addresses.insert(*id, replica_addr);
382        }
383
384        // build secondaries.
385        let primary = replicators
386            .get(&self.partition_state.primary_index)
387            .unwrap();
388        for (id, (_, replica)) in &replicas {
389            if *id == self.partition_state.primary_index {
390                let replica_info = mssf_core::types::ReplicaInformation {
391                    replicator_address: replicator_addresses.get(id).unwrap().clone(),
392                    id: *id,
393                    role: mssf_core::types::ReplicaRole::Primary,
394                    status: mssf_core::types::ReplicaStatus::Up,
395                    current_progress: -1, // -1 for invalid. observed in sf logs.
396                    catch_up_capability: -1,
397                    must_catch_up: false,
398                };
399                replica_infos.insert(*id, replica_info);
400                continue;
401            }
402            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
403            let replica_info = mssf_core::types::ReplicaInformation {
404                replicator_address: replicator_addresses.get(id).unwrap().clone(),
405                id: *id,
406                role: mssf_core::types::ReplicaRole::IdleSecondary,
407                status: mssf_core::types::ReplicaStatus::Up,
408                current_progress: -1,
409                catch_up_capability: -1,
410                must_catch_up: false,
411            };
412            replica_infos.insert(*id, replica_info.clone());
413            primary
414                .build_replica(replica_info, cancellation_token)
415                .await?;
416            // change replicator role to active secondary after successful build.
417            let rplctr = replicators.get(id).unwrap();
418            rplctr
419                .change_role(
420                    epoch.clone(),
421                    mssf_core::types::ReplicaRole::ActiveSecondary,
422                    SimpleCancelToken::new_boxed(),
423                )
424                .await?;
425            // change replica role to active secondary after successful build.
426            replica
427                .change_role(
428                    mssf_core::types::ReplicaRole::ActiveSecondary,
429                    SimpleCancelToken::new_boxed(),
430                )
431                .await?;
432            // update the replica info
433            replica_infos.get_mut(id).unwrap().role =
434                mssf_core::types::ReplicaRole::ActiveSecondary;
435        }
436
437        // Run update catchup workflow for each secondary replica. Exclude primary.
438        let mut new_config = mssf_core::types::ReplicaSetConfig {
439            replicas: vec![],
440            write_quorum: 1, // for primary
441        };
442        // incase only primary exists, save the current configuration.
443        self.partition_state.current_configuration = new_config.clone();
444        let mut ready_replicas = 1;
445        for id in replicas.keys() {
446            if *id == self.partition_state.primary_index {
447                continue;
448            }
449            let prev_config = new_config.clone();
450            // construct new config
451            let replica_info = replica_infos.get(id).unwrap().clone();
452            new_config.replicas.push(replica_info);
453            ready_replicas += 1;
454            new_config.write_quorum = ready_replicas / 2 + 1_u32;
455
456            primary.update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
457
458            // wait for catch up
459            primary
460                .wait_for_catch_up_quorum(
461                    mssf_core::types::ReplicaSetQuorumMode::Write,
462                    SimpleCancelToken::new_boxed(),
463                )
464                .await?;
465            // update current configuration
466            primary.update_current_replica_set_configuration(new_config.clone())?;
467            self.partition_state.current_configuration = new_config.clone();
468        }
469
470        // Update read write status.
471        // TODO: This might not be accurate.
472        // Maybe for primary it is always granted.
473        // Since the quorum size is increasing and no replica down during build process.
474        for (id, partition) in &partitions {
475            if *id == self.partition_state.primary_index {
476                partition.set_read_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
477                partition.set_write_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
478            } else {
479                partition
480                    .set_read_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
481                partition
482                    .set_write_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
483            }
484        }
485
486        // Save the state.
487        for (id, (factory_index, replica)) in replicas {
488            let state = StatefulServiceReplicaState {
489                replica,
490                replicator: replicators.remove(&id).unwrap(),
491                _replica_address: replica_addresses.remove(&id).unwrap(),
492                _replicator_address: replicator_addresses.remove(&id).unwrap(),
493                partition: partitions.remove(&id).unwrap(),
494                factory_index,
495            };
496            self.partition_state.replica_states.insert(id, state);
497        }
498        self.partition_state.epoch = epoch;
499        self.partition_state.static_info = Some(desc.clone());
500
501        self.check_partition_state();
502        Ok(())
503    }
504
505    /// Delete the service partition.
506    pub async fn delete_service_partition(&mut self) -> mssf_core::Result<()> {
507        // Not sure if the sequence is correct.
508
509        // Change read write status to pending
510        for state in self.partition_state.replica_states.values_mut() {
511            state.partition.set_read_status(
512                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
513            );
514            state.partition.set_write_status(
515                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
516            );
517        }
518
519        // Change primary to active secondary
520        let primary = self
521            .partition_state
522            .replica_states
523            .get_mut(&self.partition_state.primary_index)
524            .expect("Primary replica not found");
525
526        // Replicator change_role is called before Replica change_role.
527        primary
528            .replicator
529            .change_role(
530                self.partition_state.epoch.clone(), // Epoch is unchanged.
531                mssf_core::types::ReplicaRole::ActiveSecondary,
532                SimpleCancelToken::new_boxed(),
533            )
534            .await?;
535        primary
536            .replica
537            .change_role(
538                mssf_core::types::ReplicaRole::ActiveSecondary,
539                SimpleCancelToken::new_boxed(),
540            )
541            .await?;
542
543        // change role to none for all replicas
544        // Replicator change_role is called before Replica change_role.
545        for state in self.partition_state.replica_states.values_mut() {
546            state
547                .replicator
548                .change_role(
549                    self.partition_state.epoch.clone(), // Epoch is unchanged.
550                    mssf_core::types::ReplicaRole::None,
551                    SimpleCancelToken::new_boxed(),
552                )
553                .await?;
554            state
555                .replica
556                .change_role(
557                    mssf_core::types::ReplicaRole::None,
558                    SimpleCancelToken::new_boxed(),
559                )
560                .await?;
561        }
562
563        // close all replicas and replicators
564        for state in self.partition_state.replica_states.values_mut() {
565            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
566            state.replica.close(cancellation_token.clone()).await?;
567            state.replicator.close(cancellation_token).await?;
568        }
569
570        // clear the state
571        self.partition_state.replica_states.clear();
572        self.partition_state.static_info = None;
573        self.partition_state.current_configuration = mssf_core::types::ReplicaSetConfig {
574            replicas: vec![],
575            write_quorum: 0,
576        };
577        self.check_partition_state();
578        Ok(())
579    }
580
581    /// Restart a secondary replica gracefully.
582    pub async fn restart_secondary_graceful(&mut self, replica_id: i64) -> mssf_core::Result<()> {
583        // check if replica exists
584        {
585            self.partition_state
586                .replica_states
587                .get_mut(&replica_id)
588                .ok_or_else(|| {
589                    mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
590                })?;
591            // check if it is not primary
592            if replica_id == self.partition_state.primary_index {
593                tracing::error!(
594                    "Replica {} is primary, cannot restart as secondary",
595                    replica_id
596                );
597                return Err(mssf_core::Error::from(
598                    mssf_core::ErrorCode::FABRIC_E_INVALID_OPERATION,
599                ));
600            }
601        }
602
603        // Update primary to remove the replica from the configuration.
604        {
605            let primary = self.get_primary_state().unwrap();
606            let current_config = self.partition_state.current_configuration.clone();
607            let replica_count = current_config.replicas.len();
608            let new_replicas = current_config
609                .replicas
610                .iter()
611                .filter(|r| r.id != replica_id)
612                .cloned()
613                .collect::<Vec<_>>();
614            let write_quorum = (replica_count as u32) / 2 + 1; // Note that quorum is not changing here during graceful restart.
615            let new_config = mssf_core::types::ReplicaSetConfig {
616                replicas: new_replicas,
617                write_quorum,
618            };
619            primary
620                .replicator
621                .update_current_replica_set_configuration(new_config.clone())?;
622            self.partition_state.current_configuration = new_config;
623        }
624
625        let prev_state = self
626            .partition_state
627            .replica_states
628            .remove(&replica_id)
629            .unwrap();
630        let factory_index = prev_state.factory_index;
631        // Close the Secondary, and cleanup. No change_role(None) since this is a restart (data is preserved).
632        {
633            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
634            prev_state
635                .replicator
636                .close(cancellation_token.clone())
637                .await?;
638            prev_state.replica.close(cancellation_token).await?;
639            drop(prev_state);
640        }
641
642        // Create replica existing from the same factory.
643        let factory = &*self.service_factory[factory_index as usize];
644        let replica = factory
645            .create_replica(
646                self.partition_state
647                    .static_info
648                    .as_ref()
649                    .unwrap()
650                    .service_type_name
651                    .clone(),
652                self.partition_state
653                    .static_info
654                    .as_ref()
655                    .unwrap()
656                    .service_name
657                    .clone(),
658                &self.partition_state.static_info.as_ref().unwrap().init_data,
659                self.partition_state
660                    .static_info
661                    .as_ref()
662                    .unwrap()
663                    .partition_id,
664                replica_id,
665            )
666            .inspect_err(|e| {
667                tracing::error!("Failed to create stateful service replica: {:?}", e)
668            })?;
669        // open the replica
670        let partition = StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
671            mssf_core::types::SingletonPartitionInformation {
672                id: self
673                    .partition_state
674                    .static_info
675                    .as_ref()
676                    .unwrap()
677                    .partition_id,
678            },
679        ));
680        // open existing replicator
681        let replctr = replica
682            .open(
683                mssf_core::types::OpenMode::Existing,
684                Arc::new(partition.clone()),
685                SimpleCancelToken::new_boxed(),
686            )
687            .await
688            .inspect_err(|e| tracing::error!("Fail to open replica {}", e))?;
689        // open the replicator
690        let replctr_addr = replctr.open(SimpleCancelToken::new_boxed()).await?;
691        // change role to idle secondary
692        replctr
693            .change_role(
694                self.partition_state.epoch.clone(),
695                mssf_core::types::ReplicaRole::IdleSecondary,
696                SimpleCancelToken::new_boxed(),
697            )
698            .await?;
699        let replica_addr = replica
700            .change_role(
701                mssf_core::types::ReplicaRole::IdleSecondary,
702                SimpleCancelToken::new_boxed(),
703            )
704            .await?;
705
706        // build the replica again using the same id.
707        let primary = self.get_primary_state().unwrap();
708
709        let replica_info = mssf_core::types::ReplicaInformation {
710            replicator_address: replctr_addr.clone(),
711            id: replica_id,
712            role: mssf_core::types::ReplicaRole::IdleSecondary,
713            status: mssf_core::types::ReplicaStatus::Up,
714            current_progress: -1, // Observed value for restart.
715            catch_up_capability: -1,
716            must_catch_up: false,
717        };
718        primary
719            .replicator
720            .build_replica(replica_info.clone(), SimpleCancelToken::new_boxed())
721            .await?;
722
723        // change role to active secondary after successful build.
724        // Replicator change_role is called before Replica change_role.
725        replctr
726            .change_role(
727                self.partition_state.epoch.clone(),
728                mssf_core::types::ReplicaRole::ActiveSecondary,
729                SimpleCancelToken::new_boxed(),
730            )
731            .await?;
732        replica
733            .change_role(
734                mssf_core::types::ReplicaRole::ActiveSecondary,
735                SimpleCancelToken::new_boxed(),
736            )
737            .await?;
738        // update the replica info
739        let mut updated_replica_info = replica_info.clone();
740        updated_replica_info.role = mssf_core::types::ReplicaRole::ActiveSecondary;
741        // update catch up config again.
742        let prev_config = self.partition_state.current_configuration.clone();
743        let mut new_config_replicas = prev_config.replicas.clone();
744        new_config_replicas.push(updated_replica_info.clone());
745
746        let total_replica_count = new_config_replicas.len() + 1; // including primary
747        let write_quorum = (total_replica_count as u32) / 2 + 1;
748        let new_config = mssf_core::types::ReplicaSetConfig {
749            replicas: new_config_replicas,
750            write_quorum,
751        };
752        primary
753            .replicator
754            .update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
755        // wait for catch up
756        primary
757            .replicator
758            .wait_for_catch_up_quorum(
759                mssf_core::types::ReplicaSetQuorumMode::Write,
760                SimpleCancelToken::new_boxed(),
761            )
762            .await?;
763        // update current configuration again.
764        primary
765            .replicator
766            .update_current_replica_set_configuration(new_config.clone())?;
767        self.partition_state.current_configuration = new_config;
768        // save the state
769        let state = StatefulServiceReplicaState {
770            replica,
771            replicator: replctr,
772            _replica_address: replica_addr,
773            _replicator_address: replctr_addr,
774            partition,
775            factory_index,
776        };
777        let prev = self
778            .partition_state
779            .replica_states
780            .insert(replica_id, state);
781        assert!(prev.is_none(), "Service replica already exists");
782        // done.
783        self.check_partition_state();
784        Ok(())
785    }
786}