mssf_util/mock/
stateful.rs

1// ------------------------------------------------------------
2// Copyright (c) Microsoft Corporation.  All rights reserved.
3// Licensed under the MIT License (MIT). See License.txt in the repo root for license information.
4// ------------------------------------------------------------
5
6use std::{
7    collections::HashMap,
8    sync::{Arc, Mutex},
9};
10
11use mssf_core::{
12    GUID, WString,
13    runtime::IStatefulServicePartition,
14    sync::SimpleCancelToken,
15    types::{Epoch, ServicePartitionInformation, Uri},
16};
17
18#[derive(Clone)]
19pub struct StatefulServicePartitionMock {
20    info: mssf_core::types::ServicePartitionInformation,
21    read_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
22    write_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
23}
24
25impl StatefulServicePartitionMock {
26    pub fn new(info: mssf_core::types::ServicePartitionInformation) -> Self {
27        Self {
28            info,
29            read_status: Arc::new(Mutex::new(
30                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
31            )),
32            write_status: Arc::new(Mutex::new(
33                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
34            )),
35        }
36    }
37    pub fn new_boxed(
38        info: mssf_core::types::ServicePartitionInformation,
39    ) -> Box<dyn IStatefulServicePartition> {
40        Box::new(Self::new(info))
41    }
42    pub fn set_read_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
43        *self.read_status.lock().unwrap() = status;
44    }
45    pub fn set_write_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
46        *self.write_status.lock().unwrap() = status;
47    }
48}
49
50impl IStatefulServicePartition for StatefulServicePartitionMock {
51    fn create_replicator(
52        &self,
53    ) -> mssf_core::Result<Box<dyn mssf_core::runtime::IPrimaryReplicator>> {
54        unimplemented!("Not implemented")
55    }
56
57    fn get_partition_information(
58        &self,
59    ) -> mssf_core::Result<mssf_core::types::ServicePartitionInformation> {
60        Ok(self.info.clone())
61    }
62
63    fn get_read_status(&self) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
64        Ok(self.read_status.lock().unwrap().clone())
65    }
66
67    fn get_write_status(
68        &self,
69    ) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
70        Ok(self.write_status.lock().unwrap().clone())
71    }
72
73    fn report_load(&self, _metrics: &[mssf_core::types::LoadMetric]) -> mssf_core::Result<()> {
74        Ok(())
75    }
76
77    fn report_fault(&self, _fault_type: mssf_core::types::FaultType) -> mssf_core::Result<()> {
78        Ok(())
79    }
80
81    fn report_move_cost(&self, _move_cost: mssf_core::types::MoveCost) -> mssf_core::Result<()> {
82        Ok(())
83    }
84
85    fn report_partition_health(
86        &self,
87        _healthinfo: &mssf_core::types::HealthInformation,
88    ) -> mssf_core::Result<()> {
89        Ok(())
90    }
91
92    fn report_replica_health(
93        &self,
94        _healthinfo: &mssf_core::types::HealthInformation,
95    ) -> mssf_core::Result<()> {
96        Ok(())
97    }
98
99    fn try_get_com(
100        &self,
101    ) -> mssf_core::Result<&mssf_com::FabricRuntime::IFabricStatefulServicePartition> {
102        Err(mssf_core::ErrorCode::FABRIC_E_OPERATION_NOT_SUPPORTED.into())
103    }
104}
105
106#[derive(Clone)]
107pub struct CreateStatefulServicePartitionArg {
108    pub partition_id: GUID,
109    pub replica_count: usize,
110    pub init_data: Vec<u8>,
111    pub service_name: Uri,
112    pub service_type_name: WString,
113}
114
115/// Test driver for a single stateful service replica.
116pub struct StatefulServicePartitionDriver {
117    /// This keeps track of which factory to use next.
118    factory_index: i64,
119    service_factory: Vec<Box<dyn mssf_core::runtime::IStatefulServiceFactory>>,
120    replica_index: i64,
121    epoch_index: Epoch, // Used to generate new epoch.
122    partition_state: PartitionState,
123}
124
125struct PartitionState {
126    pub replica_states: HashMap<i64, StatefulServiceReplicaState>,
127    pub primary_index: i64,
128    pub epoch: Epoch,
129    pub static_info: Option<CreateStatefulServicePartitionArg>, // Filled when created.
130    pub current_configuration: mssf_core::types::ReplicaSetConfig,
131}
132
133struct StatefulServiceReplicaState {
134    pub replica: Box<dyn mssf_core::runtime::IStatefulServiceReplica>,
135    pub replicator: Box<dyn mssf_core::runtime::IPrimaryReplicator>,
136    pub partition: StatefulServicePartitionMock,
137    pub factory_index: i64, // The index of the factory that created the replica
138    pub _replica_address: WString,
139    pub _replicator_address: WString,
140}
141
142impl Default for StatefulServicePartitionDriver {
143    fn default() -> Self {
144        Self::new()
145    }
146}
147
148impl StatefulServicePartitionDriver {
149    pub fn new() -> Self {
150        Self {
151            service_factory: Vec::new(),
152            factory_index: 0,
153            replica_index: 1, // replica id starting from 1
154            epoch_index: Epoch {
155                data_loss_number: 0,
156                configuration_number: 1,
157            },
158            partition_state: PartitionState {
159                replica_states: HashMap::new(),
160                primary_index: 1, // First replica is the primary.
161                epoch: Epoch {
162                    data_loss_number: 0,
163                    configuration_number: 1,
164                },
165                static_info: None,
166                current_configuration: mssf_core::types::ReplicaSetConfig {
167                    replicas: vec![],
168                    write_quorum: 0,
169                },
170            },
171        }
172    }
173
174    /// Register a service factory to be used to create replicas.
175    /// One should register multiple factories to simulate multi node scenarios.
176    /// Replicas are created in round robin fashion from the registered factories.
177    pub fn register_service_factory(
178        &mut self,
179        factory: Box<dyn mssf_core::runtime::IStatefulServiceFactory>,
180    ) {
181        self.service_factory.push(factory);
182    }
183
184    /// Get the next service factory in round robin fashion.
185    /// This ensures that multiple factories can be tested, to simulate
186    /// multi node scenarios.
187    /// Returns the current index and the factory.
188    fn get_round_robin_factory(
189        &mut self,
190    ) -> (i64, &dyn mssf_core::runtime::IStatefulServiceFactory) {
191        assert!(!self.service_factory.is_empty());
192        let idx = self.factory_index as usize % self.service_factory.len();
193        self.factory_index += 1;
194        (idx as i64, &*self.service_factory[idx])
195    }
196
197    fn next_replica_index(&mut self) -> i64 {
198        let idx = self.replica_index;
199        self.replica_index += 1;
200        idx
201    }
202
203    fn next_epoch_index(&mut self) -> Epoch {
204        let idx = self.epoch_index.clone();
205        self.epoch_index.configuration_number += 1;
206        idx
207    }
208
209    fn get_primary_state(&self) -> mssf_core::Result<&StatefulServiceReplicaState> {
210        let state = self
211            .partition_state
212            .replica_states
213            .get(&self.partition_state.primary_index)
214            .ok_or_else(|| {
215                mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
216            })?;
217        Ok(state)
218    }
219
220    /// Check the invariants of the partition state.
221    /// Panics if any invariant is violated.
222    fn check_partition_state(&self) {
223        if self.partition_state.replica_states.is_empty() {
224            assert!(self.partition_state.static_info.is_none());
225            assert_eq!(self.partition_state.current_configuration.replicas.len(), 0);
226            assert_eq!(self.partition_state.current_configuration.write_quorum, 0);
227            return;
228        }
229        // check primary exists
230        self.get_primary_state().unwrap();
231        // check quorum size matches
232        let expected_quorum = (self.partition_state.replica_states.len() as u32) / 2 + 1;
233        assert_eq!(
234            self.partition_state.current_configuration.write_quorum,
235            expected_quorum
236        );
237    }
238}
239
240// Public Accessors
241impl StatefulServicePartitionDriver {
242    /// Get the current primary replica id.
243    pub fn get_primary_replica_id(&self) -> i64 {
244        self.partition_state.primary_index
245    }
246    /// Get a replica by id.
247    pub fn get_replica(
248        &self,
249        replica_id: i64,
250    ) -> Option<&dyn mssf_core::runtime::IStatefulServiceReplica> {
251        let state = self.partition_state.replica_states.get(&replica_id);
252        state.map(|s| s.replica.as_ref())
253    }
254    /// Get a replicator by replica id.
255    pub fn get_replicator(
256        &self,
257        replica_id: i64,
258    ) -> Option<&dyn mssf_core::runtime::IPrimaryReplicator> {
259        let state = self.partition_state.replica_states.get(&replica_id);
260        state.map(|s| s.replicator.as_ref())
261    }
262    /// List all replica ids.
263    pub fn list_replica_ids(&self) -> Vec<i64> {
264        self.partition_state
265            .replica_states
266            .keys()
267            .cloned()
268            .collect()
269    }
270}
271
272// Workflow implementations.
273impl StatefulServicePartitionDriver {
274    /// Create a stateful service partition with the specified number of replicas.
275    /// The first replica is the primary.
276    /// Runs the replica build steps.
277    pub async fn create_service_partition(
278        &mut self,
279        desc: &CreateStatefulServicePartitionArg,
280    ) -> mssf_core::Result<()> {
281        assert!(desc.replica_count > 0);
282        assert!(self.partition_state.replica_states.is_empty());
283
284        let mut replicas = HashMap::new();
285        let mut replicators = HashMap::new();
286        let mut replica_addresses = HashMap::new();
287        let mut replicator_addresses = HashMap::new();
288        let mut replica_infos = HashMap::new();
289        let mut partitions = HashMap::new();
290
291        for _ in 0..desc.replica_count {
292            let id = self.next_replica_index();
293            let (factory_index, factory) = self.get_round_robin_factory();
294            let replica = factory
295                .create_replica(
296                    desc.service_type_name.clone(),
297                    desc.service_name.clone(),
298                    &desc.init_data,
299                    desc.partition_id,
300                    id,
301                )
302                .inspect_err(|e| {
303                    tracing::error!("Failed to create stateful service replica: {:?}", e)
304                })?;
305            let prev = replicas.insert(id, (factory_index, replica));
306            assert!(prev.is_none(), "Service replica already exists");
307        }
308
309        // open all replicas
310        for (id, (_, replica)) in &replicas {
311            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
312            // TODO: support other partition schemes.
313            let partition =
314                StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
315                    mssf_core::types::SingletonPartitionInformation {
316                        id: desc.partition_id,
317                    },
318                ));
319            let replctr = replica
320                .open(
321                    mssf_core::types::OpenMode::New,
322                    Arc::new(partition.clone()),
323                    cancellation_token,
324                )
325                .await?;
326            replicators.insert(*id, replctr);
327            partitions.insert(*id, partition);
328        }
329
330        // open all replicators
331        for (id, replctr) in &replicators {
332            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
333
334            let replctr_addr = replctr.open(cancellation_token).await?;
335            replicator_addresses.insert(*id, replctr_addr);
336        }
337
338        // assign roles to replicators. for simplicity, we assume the first replica is the primary.
339        let primary_index = 1;
340        let epoch = self.next_epoch_index();
341        for (id, rplctr) in &replicators {
342            let epoch_cp = epoch.clone();
343            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
344            if *id == primary_index {
345                rplctr
346                    .change_role(
347                        epoch_cp,
348                        mssf_core::types::ReplicaRole::Primary,
349                        cancellation_token,
350                    )
351                    .await?;
352                self.partition_state.primary_index = primary_index;
353            } else {
354                rplctr
355                    .change_role(
356                        epoch_cp,
357                        mssf_core::types::ReplicaRole::IdleSecondary,
358                        cancellation_token,
359                    )
360                    .await?;
361            }
362        }
363
364        // assign roles to replicas. First one is primary.
365        for (id, (_, replica)) in &replicas {
366            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
367            let replica_addr = if *id == self.partition_state.primary_index {
368                replica
369                    .change_role(mssf_core::types::ReplicaRole::Primary, cancellation_token)
370                    .await?
371            } else {
372                replica
373                    .change_role(
374                        mssf_core::types::ReplicaRole::IdleSecondary,
375                        cancellation_token,
376                    )
377                    .await?
378            };
379            replica_addresses.insert(*id, replica_addr);
380        }
381
382        // build secondaries.
383        let primary = replicators
384            .get(&self.partition_state.primary_index)
385            .unwrap();
386        for (id, (_, replica)) in &replicas {
387            if *id == self.partition_state.primary_index {
388                let replica_info = mssf_core::types::ReplicaInformation {
389                    replicator_address: replicator_addresses.get(id).unwrap().clone(),
390                    id: *id,
391                    role: mssf_core::types::ReplicaRole::Primary,
392                    status: mssf_core::types::ReplicaStatus::Up,
393                    current_progress: -1, // -1 for invalid. observed in sf logs.
394                    catch_up_capability: -1,
395                    must_catch_up: false,
396                };
397                replica_infos.insert(*id, replica_info);
398                continue;
399            }
400            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
401            let replica_info = mssf_core::types::ReplicaInformation {
402                replicator_address: replicator_addresses.get(id).unwrap().clone(),
403                id: *id,
404                role: mssf_core::types::ReplicaRole::IdleSecondary,
405                status: mssf_core::types::ReplicaStatus::Up,
406                current_progress: -1,
407                catch_up_capability: -1,
408                must_catch_up: false,
409            };
410            replica_infos.insert(*id, replica_info.clone());
411            primary
412                .build_replica(replica_info, cancellation_token)
413                .await?;
414            // change role to active secondary after successful build.
415            replica
416                .change_role(
417                    mssf_core::types::ReplicaRole::ActiveSecondary,
418                    SimpleCancelToken::new_boxed(),
419                )
420                .await?;
421            // update the replica info
422            replica_infos.get_mut(id).unwrap().role =
423                mssf_core::types::ReplicaRole::ActiveSecondary;
424        }
425
426        // Run update catchup workflow for each secondary replica. Exclude primary.
427        let mut new_config = mssf_core::types::ReplicaSetConfig {
428            replicas: vec![],
429            write_quorum: 1, // for primary
430        };
431        let mut ready_replicas = 1;
432        for id in replicas.keys() {
433            if *id == self.partition_state.primary_index {
434                continue;
435            }
436            let prev_config = new_config.clone();
437            // construct new config
438            let replica_info = replica_infos.get(id).unwrap().clone();
439            new_config.replicas.push(replica_info);
440            ready_replicas += 1;
441            new_config.write_quorum = ready_replicas / 2 + 1_u32;
442
443            primary.update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
444
445            // wait for catch up
446            primary
447                .wait_for_catch_up_quorum(
448                    mssf_core::types::ReplicaSetQuorumMode::Write,
449                    SimpleCancelToken::new_boxed(),
450                )
451                .await?;
452            // update current configuration
453            primary.update_current_replica_set_configuration(new_config.clone())?;
454            self.partition_state.current_configuration = new_config.clone();
455        }
456
457        // Update read write status.
458        // TODO: This might not be accurate.
459        // Maybe for primary it is always granted.
460        // Since the quorum size is increasing and no replica down during build process.
461        for (id, partition) in &partitions {
462            if *id == self.partition_state.primary_index {
463                partition.set_read_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
464                partition.set_write_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
465            } else {
466                partition
467                    .set_read_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
468                partition
469                    .set_write_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
470            }
471        }
472
473        // Save the state.
474        for (id, (factory_index, replica)) in replicas {
475            let state = StatefulServiceReplicaState {
476                replica,
477                replicator: replicators.remove(&id).unwrap(),
478                _replica_address: replica_addresses.remove(&id).unwrap(),
479                _replicator_address: replicator_addresses.remove(&id).unwrap(),
480                partition: partitions.remove(&id).unwrap(),
481                factory_index,
482            };
483            self.partition_state.replica_states.insert(id, state);
484        }
485        self.partition_state.epoch = epoch;
486        self.partition_state.static_info = Some(desc.clone());
487
488        self.check_partition_state();
489        Ok(())
490    }
491
492    /// Delete the service partition.
493    pub async fn delete_service_partition(&mut self) -> mssf_core::Result<()> {
494        // Not sure if the sequence is correct.
495
496        // Change read write status to pending
497        for state in self.partition_state.replica_states.values_mut() {
498            state.partition.set_read_status(
499                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
500            );
501            state.partition.set_write_status(
502                mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
503            );
504        }
505
506        // Change primary to active secondary
507        let primary = self
508            .partition_state
509            .replica_states
510            .get_mut(&1)
511            .expect("Primary replica not found");
512
513        let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
514        primary
515            .replica
516            .change_role(
517                mssf_core::types::ReplicaRole::ActiveSecondary,
518                cancellation_token,
519            )
520            .await?;
521        primary
522            .replicator
523            .change_role(
524                self.partition_state.epoch.clone(), // Epoch is unchanged.
525                mssf_core::types::ReplicaRole::ActiveSecondary,
526                SimpleCancelToken::new_boxed(),
527            )
528            .await?;
529
530        // change role to none for all replicas
531        for state in self.partition_state.replica_states.values_mut() {
532            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
533            state
534                .replica
535                .change_role(mssf_core::types::ReplicaRole::None, cancellation_token)
536                .await?;
537            state
538                .replicator
539                .change_role(
540                    self.partition_state.epoch.clone(), // Epoch is unchanged.
541                    mssf_core::types::ReplicaRole::None,
542                    SimpleCancelToken::new_boxed(),
543                )
544                .await?;
545        }
546
547        // close all replicas and replicators
548        for state in self.partition_state.replica_states.values_mut() {
549            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
550            state.replica.close(cancellation_token.clone()).await?;
551            state.replicator.close(cancellation_token).await?;
552        }
553
554        // clear the state
555        self.partition_state.replica_states.clear();
556        self.partition_state.static_info = None;
557        self.partition_state.current_configuration = mssf_core::types::ReplicaSetConfig {
558            replicas: vec![],
559            write_quorum: 0,
560        };
561        self.check_partition_state();
562        Ok(())
563    }
564
565    /// Restart a secondary replica gracefully.
566    pub async fn restart_secondary_graceful(&mut self, replica_id: i64) -> mssf_core::Result<()> {
567        // check if replica exists
568        {
569            self.partition_state
570                .replica_states
571                .get_mut(&replica_id)
572                .ok_or_else(|| {
573                    mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
574                })?;
575            // check if it is not primary
576            if replica_id == self.partition_state.primary_index {
577                tracing::error!(
578                    "Replica {} is primary, cannot restart as secondary",
579                    replica_id
580                );
581                return Err(mssf_core::Error::from(
582                    mssf_core::ErrorCode::FABRIC_E_INVALID_OPERATION,
583                ));
584            }
585        }
586
587        // Update primary to remove the replica from the configuration.
588        {
589            let primary = self.get_primary_state().unwrap();
590            let current_config = self.partition_state.current_configuration.clone();
591            let replica_count = current_config.replicas.len();
592            let new_replicas = current_config
593                .replicas
594                .iter()
595                .filter(|r| r.id != replica_id)
596                .cloned()
597                .collect::<Vec<_>>();
598            let write_quorum = (replica_count as u32) / 2 + 1; // Note that quorum is not changing here during graceful restart.
599            let new_config = mssf_core::types::ReplicaSetConfig {
600                replicas: new_replicas,
601                write_quorum,
602            };
603            primary
604                .replicator
605                .update_current_replica_set_configuration(new_config.clone())?;
606            self.partition_state.current_configuration = new_config;
607        }
608
609        let prev_state = self
610            .partition_state
611            .replica_states
612            .remove(&replica_id)
613            .unwrap();
614        let factory_index = prev_state.factory_index;
615        // Close the Secondary, and cleanup.
616        {
617            let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
618            prev_state.replica.close(cancellation_token.clone()).await?;
619            prev_state.replicator.close(cancellation_token).await?;
620            drop(prev_state);
621        }
622
623        // Create replica existing from the same factory.
624        let factory = &*self.service_factory[factory_index as usize];
625        let replica = factory
626            .create_replica(
627                self.partition_state
628                    .static_info
629                    .as_ref()
630                    .unwrap()
631                    .service_type_name
632                    .clone(),
633                self.partition_state
634                    .static_info
635                    .as_ref()
636                    .unwrap()
637                    .service_name
638                    .clone(),
639                &self.partition_state.static_info.as_ref().unwrap().init_data,
640                self.partition_state
641                    .static_info
642                    .as_ref()
643                    .unwrap()
644                    .partition_id,
645                replica_id,
646            )
647            .inspect_err(|e| {
648                tracing::error!("Failed to create stateful service replica: {:?}", e)
649            })?;
650        // open the replica
651        let partition = StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
652            mssf_core::types::SingletonPartitionInformation {
653                id: self
654                    .partition_state
655                    .static_info
656                    .as_ref()
657                    .unwrap()
658                    .partition_id,
659            },
660        ));
661        // open existing replicator
662        let replctr = replica
663            .open(
664                mssf_core::types::OpenMode::Existing,
665                Arc::new(partition.clone()),
666                SimpleCancelToken::new_boxed(),
667            )
668            .await
669            .inspect_err(|e| tracing::error!("Fail to open replica {}", e))?;
670        // open the replicator
671        let replctr_addr = replctr.open(SimpleCancelToken::new_boxed()).await?;
672        // change role to idle secondary
673        replctr
674            .change_role(
675                self.partition_state.epoch.clone(),
676                mssf_core::types::ReplicaRole::IdleSecondary,
677                SimpleCancelToken::new_boxed(),
678            )
679            .await?;
680        let replica_addr = replica
681            .change_role(
682                mssf_core::types::ReplicaRole::IdleSecondary,
683                SimpleCancelToken::new_boxed(),
684            )
685            .await?;
686
687        // build the replica again using the same id.
688        let primary = self.get_primary_state().unwrap();
689
690        let replica_info = mssf_core::types::ReplicaInformation {
691            replicator_address: replctr_addr.clone(),
692            id: replica_id,
693            role: mssf_core::types::ReplicaRole::IdleSecondary,
694            status: mssf_core::types::ReplicaStatus::Up,
695            current_progress: -1, // Observed value for restart.
696            catch_up_capability: -1,
697            must_catch_up: false,
698        };
699        primary
700            .replicator
701            .build_replica(replica_info.clone(), SimpleCancelToken::new_boxed())
702            .await?;
703
704        // change role to active secondary after successful build.
705        replica
706            .change_role(
707                mssf_core::types::ReplicaRole::ActiveSecondary,
708                SimpleCancelToken::new_boxed(),
709            )
710            .await?;
711        // update the replica info
712        let mut updated_replica_info = replica_info.clone();
713        updated_replica_info.role = mssf_core::types::ReplicaRole::ActiveSecondary;
714        // update catch up config again.
715        let prev_config = self.partition_state.current_configuration.clone();
716        let mut new_replicas = prev_config.replicas.clone();
717        new_replicas.push(updated_replica_info.clone());
718        let write_quorum = (new_replicas.len() as u32) / 2 + 1;
719        let new_config = mssf_core::types::ReplicaSetConfig {
720            replicas: new_replicas,
721            write_quorum,
722        };
723        primary
724            .replicator
725            .update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
726        // wait for catch up
727        primary
728            .replicator
729            .wait_for_catch_up_quorum(
730                mssf_core::types::ReplicaSetQuorumMode::Write,
731                SimpleCancelToken::new_boxed(),
732            )
733            .await?;
734        // update current configuration again.
735        primary
736            .replicator
737            .update_current_replica_set_configuration(new_config.clone())?;
738        self.partition_state.current_configuration = new_config;
739        // save the state
740        let state = StatefulServiceReplicaState {
741            replica,
742            replicator: replctr,
743            _replica_address: replica_addr,
744            _replicator_address: replctr_addr,
745            partition,
746            factory_index,
747        };
748        let prev = self
749            .partition_state
750            .replica_states
751            .insert(replica_id, state);
752        assert!(prev.is_none(), "Service replica already exists");
753        // done.
754        self.check_partition_state();
755        Ok(())
756    }
757}