mssf_util/mock/
stateful.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{
7    collections::HashMap,
8    sync::{Arc, Mutex},
9};
10
11use mssf_core::{
12    GUID, WString,
13    runtime::IStatefulServicePartition,
14    sync::SimpleCancelToken,
15    types::{Epoch, ServicePartitionInformation, Uri},
16};
17
18#[derive(Clone)]
19pub struct StatefulServicePartitionMock {
20    info: mssf_core::types::ServicePartitionInformation,
21    read_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
22    write_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
23}
24
25impl StatefulServicePartitionMock {
26    pub fn new(info: mssf_core::types::ServicePartitionInformation) -> Self {
27        Self {
28            info,
29            read_status: Arc::new(Mutex::new(
30                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
31            )),
32            write_status: Arc::new(Mutex::new(
33                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
34            )),
35        }
36    }
37    pub fn new_boxed(
38        info: mssf_core::types::ServicePartitionInformation,
39    ) -> Box<dyn IStatefulServicePartition> {
40        Box::new(Self::new(info))
41    }
42    pub fn set_read_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
43        *self.read_status.lock().unwrap() = status;
44    }
45    pub fn set_write_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
46        *self.write_status.lock().unwrap() = status;
47    }
48}
49
50impl IStatefulServicePartition for StatefulServicePartitionMock {
51    fn create_replicator(
52        &self,
53    ) -> mssf_core::Result<Box<dyn mssf_core::runtime::IPrimaryReplicator>> {
54        unimplemented!("Not implemented")
55    }
56
57    fn get_partition_information(
58        &self,
59    ) -> mssf_core::Result<mssf_core::types::ServicePartitionInformation> {
60        Ok(self.info.clone())
61    }
62
63    fn get_read_status(&self) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
64        Ok(self.read_status.lock().unwrap().clone())
65    }
66
67    fn get_write_status(
68        &self,
69    ) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
70        Ok(self.write_status.lock().unwrap().clone())
71    }
72
73    fn report_load(&self, _metrics: &[mssf_core::types::LoadMetric]) -> mssf_core::Result<()> {
74        Ok(())
75    }
76
77    fn report_fault(&self, _fault_type: mssf_core::types::FaultType) -> mssf_core::Result<()> {
78        Ok(())
79    }
80
81    fn report_move_cost(&self, _move_cost: mssf_core::types::MoveCost) -> mssf_core::Result<()> {
82        Ok(())
83    }
84
85    fn report_partition_health(
86        &self,
87        _healthinfo: &mssf_core::types::HealthInformation,
88    ) -> mssf_core::Result<()> {
89        Ok(())
90    }
91
92    fn report_replica_health(
93        &self,
94        _healthinfo: &mssf_core::types::HealthInformation,
95    ) -> mssf_core::Result<()> {
96        Ok(())
97    }
98
99    fn try_get_com(
100        &self,
101    ) -> mssf_core::Result<&mssf_com::FabricRuntime::IFabricStatefulServicePartition> {
102        Err(mssf_core::ErrorCode::FABRIC_E_OPERATION_NOT_SUPPORTED.into())
103    }
104}
105
106#[derive(Clone)]
107pub struct CreateStatefulServicePartitionArg {
108    pub partition_id: GUID,
109    pub replica_count: usize,
110    pub init_data: Vec<u8>,
111    pub service_name: Uri,
112    pub service_type_name: WString,
113}
114
115/// Test driver for a single stateful service replica.
116pub struct StatefulServicePartitionDriver {
117    /// This keeps track of which factory to use next.
118    factory_index: i64,
119    service_factory: Vec<Box<dyn mssf_core::runtime::IStatefulServiceFactory>>,
120    replica_index: i64,
121    epoch_index: Epoch, // Used to generate new epoch.
122    partition_state: PartitionState,
123}
124
125struct PartitionState {
126    // states for all replicas.
127    pub replica_states: HashMap<i64, StatefulServiceReplicaState>,
128    pub primary_index: i64,
129    pub epoch: Epoch,
130    pub static_info: Option<CreateStatefulServicePartitionArg>, // Filled when created.
131    // Write quorum and secondary replica list.
132    pub current_configuration: mssf_core::types::ReplicaSetConfig,
133}
134
135struct StatefulServiceReplicaState {
136    pub replica: Box<dyn mssf_core::runtime::IStatefulServiceReplica>,
137    pub replicator: Box<dyn mssf_core::runtime::IPrimaryReplicator>,
138    pub partition: StatefulServicePartitionMock,
139    pub factory_index: i64, // The index of the factory that created the replica
140    pub _replica_address: WString,
141    pub _replicator_address: WString,
142}
143
144impl Default for StatefulServicePartitionDriver {
145    fn default() -> Self {
146        Self::new()
147    }
148}
149
150impl StatefulServicePartitionDriver {
151    pub fn new() -> Self {
152        Self {
153            service_factory: Vec::new(),
154            factory_index: 0,
155            replica_index: 1, // replica id starting from 1
156            epoch_index: Epoch {
157                data_loss_number: 0,
158                configuration_number: 1,
159            },
160            partition_state: PartitionState {
161                replica_states: HashMap::new(),
162                primary_index: 1, // First replica is the primary.
163                epoch: Epoch {
164                    data_loss_number: 0,
165                    configuration_number: 1,
166                },
167                static_info: None,
168                current_configuration: mssf_core::types::ReplicaSetConfig {
169                    replicas: vec![],
170                    write_quorum: 0,
171                },
172            },
173        }
174    }
175
176    /// Register a service factory to be used to create replicas.
177    /// One should register multiple factories to simulate multi node scenarios.
178    /// Replicas are created in round robin fashion from the registered factories.
179    pub fn register_service_factory(
180        &mut self,
181        factory: Box<dyn mssf_core::runtime::IStatefulServiceFactory>,
182    ) {
183        self.service_factory.push(factory);
184    }
185
186    /// Get the next service factory in round robin fashion.
187    /// This ensures that multiple factories can be tested, to simulate
188    /// multi node scenarios.
189    /// Returns the current index and the factory.
190    fn get_round_robin_factory(
191        &mut self,
192    ) -> (i64, &dyn mssf_core::runtime::IStatefulServiceFactory) {
193        assert!(!self.service_factory.is_empty());
194        let idx = self.factory_index as usize % self.service_factory.len();
195        self.factory_index += 1;
196        (idx as i64, &*self.service_factory[idx])
197    }
198
199    fn next_replica_index(&mut self) -> i64 {
200        let idx = self.replica_index;
201        self.replica_index += 1;
202        idx
203    }
204
205    fn next_epoch_index(&mut self) -> Epoch {
206        let idx = self.epoch_index.clone();
207        self.epoch_index.configuration_number += 1;
208        idx
209    }
210
211    fn get_primary_state(&self) -> mssf_core::Result<&StatefulServiceReplicaState> {
212        let state = self
213            .partition_state
214            .replica_states
215            .get(&self.partition_state.primary_index)
216            .ok_or_else(|| {
217                mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
218            })?;
219        Ok(state)
220    }
221
222    /// Check the invariants of the partition state.
223    /// Panics if any invariant is violated.
224    fn check_partition_state(&self) {
225        if self.partition_state.replica_states.is_empty() {
226            assert!(self.partition_state.static_info.is_none());
227            assert_eq!(self.partition_state.current_configuration.replicas.len(), 0);
228            assert_eq!(self.partition_state.current_configuration.write_quorum, 0);
229            return;
230        }
231        // check primary exists
232        self.get_primary_state().unwrap();
233        // check quorum size matches
234        let expected_quorum = (self.partition_state.replica_states.len() as u32) / 2 + 1;
235        assert_eq!(
236            self.partition_state.current_configuration.write_quorum,
237            expected_quorum
238        );
239    }
240}
241
242// Public Accessors
243impl StatefulServicePartitionDriver {
244    /// Get the current primary replica id.
245    pub fn get_primary_replica_id(&self) -> i64 {
246        self.partition_state.primary_index
247    }
248    /// Get a replica by id.
249    pub fn get_replica(
250        &self,
251        replica_id: i64,
252    ) -> Option<&dyn mssf_core::runtime::IStatefulServiceReplica> {
253        let state = self.partition_state.replica_states.get(&replica_id);
254        state.map(|s| s.replica.as_ref())
255    }
256    /// Get a replicator by replica id.
257    pub fn get_replicator(
258        &self,
259        replica_id: i64,
260    ) -> Option<&dyn mssf_core::runtime::IPrimaryReplicator> {
261        let state = self.partition_state.replica_states.get(&replica_id);
262        state.map(|s| s.replicator.as_ref())
263    }
264    /// List all replica ids.
265    pub fn list_replica_ids(&self) -> Vec<i64> {
266        self.partition_state
267            .replica_states
268            .keys()
269            .cloned()
270            .collect()
271    }
272}
273
274// Workflow implementations.
275impl StatefulServicePartitionDriver {
276    /// Create a stateful service partition with the specified number of replicas.
277    /// The first replica is the primary.
278    /// Runs the replica build steps.
279    pub async fn create_service_partition(
280        &mut self,
281        desc: &CreateStatefulServicePartitionArg,
282    ) -> mssf_core::Result<()> {
283        assert!(desc.replica_count > 0);
284        assert!(self.partition_state.replica_states.is_empty());
285
286        let mut replicas = HashMap::new();
287        let mut replicators = HashMap::new();
288        let mut replica_addresses = HashMap::new();
289        let mut replicator_addresses = HashMap::new();
290        let mut replica_infos = HashMap::new();
291        let mut partitions = HashMap::new();
292
293        for _ in 0..desc.replica_count {
294            let id = self.next_replica_index();
295            let (factory_index, factory) = self.get_round_robin_factory();
296            let replica = factory
297                .create_replica(
298                    desc.service_type_name.clone(),
299                    desc.service_name.clone(),
300                    &desc.init_data,
301                    desc.partition_id,
302                    id,
303                )
304                .inspect_err(|e| {
305                    tracing::error!("Failed to create stateful service replica: {:?}", e)
306                })?;
307            let prev = replicas.insert(id, (factory_index, replica));
308            assert!(prev.is_none(), "Service replica already exists");
309        }
310
311        // open all replicas
312        for (id, (_, replica)) in &replicas {
313            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
314            // TODO: support other partition schemes.
315            let partition =
316                StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
317                    mssf_core::types::SingletonPartitionInformation {
318                        id: desc.partition_id,
319                    },
320                ));
321            let replctr = replica
322                .open(
323                    mssf_core::types::OpenMode::New,
324                    Arc::new(partition.clone()),
325                    cancellation_token,
326                )
327                .await?;
328            replicators.insert(*id, replctr);
329            partitions.insert(*id, partition);
330        }
331
332        // open all replicators
333        for (id, replctr) in &replicators {
334            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
335
336            let replctr_addr = replctr.open(cancellation_token).await?;
337            replicator_addresses.insert(*id, replctr_addr);
338        }
339
340        // assign roles to replicators. for simplicity, we assume the first replica is the primary.
341        let primary_index = 1;
342        let epoch = self.next_epoch_index();
343        for (id, rplctr) in &replicators {
344            let epoch_cp = epoch.clone();
345            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
346            if *id == primary_index {
347                rplctr
348                    .change_role(
349                        epoch_cp,
350                        mssf_core::types::ReplicaRole::Primary,
351                        cancellation_token,
352                    )
353                    .await?;
354                self.partition_state.primary_index = primary_index;
355            } else {
356                rplctr
357                    .change_role(
358                        epoch_cp,
359                        mssf_core::types::ReplicaRole::IdleSecondary,
360                        cancellation_token,
361                    )
362                    .await?;
363            }
364        }
365
366        // assign roles to replicas. First one is primary.
367        for (id, (_, replica)) in &replicas {
368            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
369            let replica_addr = if *id == self.partition_state.primary_index {
370                replica
371                    .change_role(mssf_core::types::ReplicaRole::Primary, cancellation_token)
372                    .await?
373            } else {
374                replica
375                    .change_role(
376                        mssf_core::types::ReplicaRole::IdleSecondary,
377                        cancellation_token,
378                    )
379                    .await?
380            };
381            replica_addresses.insert(*id, replica_addr);
382        }
383
384        // build secondaries.
385        let primary = replicators
386            .get(&self.partition_state.primary_index)
387            .unwrap();
388        for (id, (_, replica)) in &replicas {
389            if *id == self.partition_state.primary_index {
390                let replica_info = mssf_core::types::ReplicaInformation {
391                    replicator_address: replicator_addresses.get(id).unwrap().clone(),
392                    id: *id,
393                    role: mssf_core::types::ReplicaRole::Primary,
394                    status: mssf_core::types::ReplicaStatus::Up,
395                    current_progress: -1, // -1 for invalid. observed in sf logs.
396                    catch_up_capability: -1,
397                    must_catch_up: false,
398                };
399                replica_infos.insert(*id, replica_info);
400                continue;
401            }
402            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
403            let replica_info = mssf_core::types::ReplicaInformation {
404                replicator_address: replicator_addresses.get(id).unwrap().clone(),
405                id: *id,
406                role: mssf_core::types::ReplicaRole::IdleSecondary,
407                status: mssf_core::types::ReplicaStatus::Up,
408                current_progress: -1,
409                catch_up_capability: -1,
410                must_catch_up: false,
411            };
412            replica_infos.insert(*id, replica_info.clone());
413            primary
414                .build_replica(replica_info, cancellation_token)
415                .await?;
416            // change role to active secondary after successful build.
417            replica
418                .change_role(
419                    mssf_core::types::ReplicaRole::ActiveSecondary,
420                    SimpleCancelToken::new_boxed(),
421                )
422                .await?;
423            // update the replica info
424            replica_infos.get_mut(id).unwrap().role =
425                mssf_core::types::ReplicaRole::ActiveSecondary;
426        }
427
428        // Run update catchup workflow for each secondary replica. Exclude primary.
429        let mut new_config = mssf_core::types::ReplicaSetConfig {
430            replicas: vec![],
431            write_quorum: 1, // for primary
432        };
433        // incase only primary exists, save the current configuration.
434        self.partition_state.current_configuration = new_config.clone();
435        let mut ready_replicas = 1;
436        for id in replicas.keys() {
437            if *id == self.partition_state.primary_index {
438                continue;
439            }
440            let prev_config = new_config.clone();
441            // construct new config
442            let replica_info = replica_infos.get(id).unwrap().clone();
443            new_config.replicas.push(replica_info);
444            ready_replicas += 1;
445            new_config.write_quorum = ready_replicas / 2 + 1_u32;
446
447            primary.update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
448
449            // wait for catch up
450            primary
451                .wait_for_catch_up_quorum(
452                    mssf_core::types::ReplicaSetQuorumMode::Write,
453                    SimpleCancelToken::new_boxed(),
454                )
455                .await?;
456            // update current configuration
457            primary.update_current_replica_set_configuration(new_config.clone())?;
458            self.partition_state.current_configuration = new_config.clone();
459        }
460
461        // Update read write status.
462        // TODO: This might not be accurate.
463        // Maybe for primary it is always granted.
464        // Since the quorum size is increasing and no replica down during build process.
465        for (id, partition) in &partitions {
466            if *id == self.partition_state.primary_index {
467                partition.set_read_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
468                partition.set_write_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
469            } else {
470                partition
471                    .set_read_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
472                partition
473                    .set_write_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
474            }
475        }
476
477        // Save the state.
478        for (id, (factory_index, replica)) in replicas {
479            let state = StatefulServiceReplicaState {
480                replica,
481                replicator: replicators.remove(&id).unwrap(),
482                _replica_address: replica_addresses.remove(&id).unwrap(),
483                _replicator_address: replicator_addresses.remove(&id).unwrap(),
484                partition: partitions.remove(&id).unwrap(),
485                factory_index,
486            };
487            self.partition_state.replica_states.insert(id, state);
488        }
489        self.partition_state.epoch = epoch;
490        self.partition_state.static_info = Some(desc.clone());
491
492        self.check_partition_state();
493        Ok(())
494    }
495
496    /// Delete the service partition.
497    pub async fn delete_service_partition(&mut self) -> mssf_core::Result<()> {
498        // Not sure if the sequence is correct.
499
500        // Change read write status to pending
501        for state in self.partition_state.replica_states.values_mut() {
502            state.partition.set_read_status(
503                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
504            );
505            state.partition.set_write_status(
506                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
507            );
508        }
509
510        // Change primary to active secondary
511        let primary = self
512            .partition_state
513            .replica_states
514            .get_mut(&1)
515            .expect("Primary replica not found");
516
517        let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
518        primary
519            .replica
520            .change_role(
521                mssf_core::types::ReplicaRole::ActiveSecondary,
522                cancellation_token,
523            )
524            .await?;
525        primary
526            .replicator
527            .change_role(
528                self.partition_state.epoch.clone(), // Epoch is unchanged.
529                mssf_core::types::ReplicaRole::ActiveSecondary,
530                SimpleCancelToken::new_boxed(),
531            )
532            .await?;
533
534        // change role to none for all replicas
535        for state in self.partition_state.replica_states.values_mut() {
536            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
537            state
538                .replica
539                .change_role(mssf_core::types::ReplicaRole::None, cancellation_token)
540                .await?;
541            state
542                .replicator
543                .change_role(
544                    self.partition_state.epoch.clone(), // Epoch is unchanged.
545                    mssf_core::types::ReplicaRole::None,
546                    SimpleCancelToken::new_boxed(),
547                )
548                .await?;
549        }
550
551        // close all replicas and replicators
552        for state in self.partition_state.replica_states.values_mut() {
553            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
554            state.replica.close(cancellation_token.clone()).await?;
555            state.replicator.close(cancellation_token).await?;
556        }
557
558        // clear the state
559        self.partition_state.replica_states.clear();
560        self.partition_state.static_info = None;
561        self.partition_state.current_configuration = mssf_core::types::ReplicaSetConfig {
562            replicas: vec![],
563            write_quorum: 0,
564        };
565        self.check_partition_state();
566        Ok(())
567    }
568
569    /// Restart a secondary replica gracefully.
570    pub async fn restart_secondary_graceful(&mut self, replica_id: i64) -> mssf_core::Result<()> {
571        // check if replica exists
572        {
573            self.partition_state
574                .replica_states
575                .get_mut(&replica_id)
576                .ok_or_else(|| {
577                    mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
578                })?;
579            // check if it is not primary
580            if replica_id == self.partition_state.primary_index {
581                tracing::error!(
582                    "Replica {} is primary, cannot restart as secondary",
583                    replica_id
584                );
585                return Err(mssf_core::Error::from(
586                    mssf_core::ErrorCode::FABRIC_E_INVALID_OPERATION,
587                ));
588            }
589        }
590
591        // Update primary to remove the replica from the configuration.
592        {
593            let primary = self.get_primary_state().unwrap();
594            let current_config = self.partition_state.current_configuration.clone();
595            let replica_count = current_config.replicas.len();
596            let new_replicas = current_config
597                .replicas
598                .iter()
599                .filter(|r| r.id != replica_id)
600                .cloned()
601                .collect::<Vec<_>>();
602            let write_quorum = (replica_count as u32) / 2 + 1; // Note that quorum is not changing here during graceful restart.
603            let new_config = mssf_core::types::ReplicaSetConfig {
604                replicas: new_replicas,
605                write_quorum,
606            };
607            primary
608                .replicator
609                .update_current_replica_set_configuration(new_config.clone())?;
610            self.partition_state.current_configuration = new_config;
611        }
612
613        let prev_state = self
614            .partition_state
615            .replica_states
616            .remove(&replica_id)
617            .unwrap();
618        let factory_index = prev_state.factory_index;
619        // Close the Secondary, and cleanup.
620        {
621            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
622            prev_state.replica.close(cancellation_token.clone()).await?;
623            prev_state.replicator.close(cancellation_token).await?;
624            drop(prev_state);
625        }
626
627        // Create replica existing from the same factory.
628        let factory = &*self.service_factory[factory_index as usize];
629        let replica = factory
630            .create_replica(
631                self.partition_state
632                    .static_info
633                    .as_ref()
634                    .unwrap()
635                    .service_type_name
636                    .clone(),
637                self.partition_state
638                    .static_info
639                    .as_ref()
640                    .unwrap()
641                    .service_name
642                    .clone(),
643                &self.partition_state.static_info.as_ref().unwrap().init_data,
644                self.partition_state
645                    .static_info
646                    .as_ref()
647                    .unwrap()
648                    .partition_id,
649                replica_id,
650            )
651            .inspect_err(|e| {
652                tracing::error!("Failed to create stateful service replica: {:?}", e)
653            })?;
654        // open the replica
655        let partition = StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
656            mssf_core::types::SingletonPartitionInformation {
657                id: self
658                    .partition_state
659                    .static_info
660                    .as_ref()
661                    .unwrap()
662                    .partition_id,
663            },
664        ));
665        // open existing replicator
666        let replctr = replica
667            .open(
668                mssf_core::types::OpenMode::Existing,
669                Arc::new(partition.clone()),
670                SimpleCancelToken::new_boxed(),
671            )
672            .await
673            .inspect_err(|e| tracing::error!("Fail to open replica {}", e))?;
674        // open the replicator
675        let replctr_addr = replctr.open(SimpleCancelToken::new_boxed()).await?;
676        // change role to idle secondary
677        replctr
678            .change_role(
679                self.partition_state.epoch.clone(),
680                mssf_core::types::ReplicaRole::IdleSecondary,
681                SimpleCancelToken::new_boxed(),
682            )
683            .await?;
684        let replica_addr = replica
685            .change_role(
686                mssf_core::types::ReplicaRole::IdleSecondary,
687                SimpleCancelToken::new_boxed(),
688            )
689            .await?;
690
691        // build the replica again using the same id.
692        let primary = self.get_primary_state().unwrap();
693
694        let replica_info = mssf_core::types::ReplicaInformation {
695            replicator_address: replctr_addr.clone(),
696            id: replica_id,
697            role: mssf_core::types::ReplicaRole::IdleSecondary,
698            status: mssf_core::types::ReplicaStatus::Up,
699            current_progress: -1, // Observed value for restart.
700            catch_up_capability: -1,
701            must_catch_up: false,
702        };
703        primary
704            .replicator
705            .build_replica(replica_info.clone(), SimpleCancelToken::new_boxed())
706            .await?;
707
708        // change role to active secondary after successful build.
709        replica
710            .change_role(
711                mssf_core::types::ReplicaRole::ActiveSecondary,
712                SimpleCancelToken::new_boxed(),
713            )
714            .await?;
715        // update the replica info
716        let mut updated_replica_info = replica_info.clone();
717        updated_replica_info.role = mssf_core::types::ReplicaRole::ActiveSecondary;
718        // update catch up config again.
719        let prev_config = self.partition_state.current_configuration.clone();
720        let mut new_config_replicas = prev_config.replicas.clone();
721        new_config_replicas.push(updated_replica_info.clone());
722
723        let total_replica_count = new_config_replicas.len() + 1; // including primary
724        let write_quorum = (total_replica_count as u32) / 2 + 1;
725        let new_config = mssf_core::types::ReplicaSetConfig {
726            replicas: new_config_replicas,
727            write_quorum,
728        };
729        primary
730            .replicator
731            .update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
732        // wait for catch up
733        primary
734            .replicator
735            .wait_for_catch_up_quorum(
736                mssf_core::types::ReplicaSetQuorumMode::Write,
737                SimpleCancelToken::new_boxed(),
738            )
739            .await?;
740        // update current configuration again.
741        primary
742            .replicator
743            .update_current_replica_set_configuration(new_config.clone())?;
744        self.partition_state.current_configuration = new_config;
745        // save the state
746        let state = StatefulServiceReplicaState {
747            replica,
748            replicator: replctr,
749            _replica_address: replica_addr,
750            _replicator_address: replctr_addr,
751            partition,
752            factory_index,
753        };
754        let prev = self
755            .partition_state
756            .replica_states
757            .insert(replica_id, state);
758        assert!(prev.is_none(), "Service replica already exists");
759        // done.
760        self.check_partition_state();
761        Ok(())
762    }
763}