1use std::{
7 collections::HashMap,
8 sync::{Arc, Mutex},
9};
10
11use mssf_core::{
12 GUID, WString,
13 runtime::IStatefulServicePartition,
14 sync::SimpleCancelToken,
15 types::{Epoch, ServicePartitionInformation, Uri},
16};
17
18#[derive(Clone)]
19pub struct StatefulServicePartitionMock {
20 info: mssf_core::types::ServicePartitionInformation,
21 read_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
22 write_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
23}
24
25impl StatefulServicePartitionMock {
26 pub fn new(info: mssf_core::types::ServicePartitionInformation) -> Self {
27 Self {
28 info,
29 read_status: Arc::new(Mutex::new(
30 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
31 )),
32 write_status: Arc::new(Mutex::new(
33 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
34 )),
35 }
36 }
37 pub fn new_boxed(
38 info: mssf_core::types::ServicePartitionInformation,
39 ) -> Box<dyn IStatefulServicePartition> {
40 Box::new(Self::new(info))
41 }
42 pub fn set_read_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
43 *self.read_status.lock().unwrap() = status;
44 }
45 pub fn set_write_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
46 *self.write_status.lock().unwrap() = status;
47 }
48}
49
50impl IStatefulServicePartition for StatefulServicePartitionMock {
51 fn create_replicator(
52 &self,
53 ) -> mssf_core::Result<Box<dyn mssf_core::runtime::IPrimaryReplicator>> {
54 unimplemented!("Not implemented")
55 }
56
57 fn get_partition_information(
58 &self,
59 ) -> mssf_core::Result<mssf_core::types::ServicePartitionInformation> {
60 Ok(self.info.clone())
61 }
62
63 fn get_read_status(&self) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
64 Ok(self.read_status.lock().unwrap().clone())
65 }
66
67 fn get_write_status(
68 &self,
69 ) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
70 Ok(self.write_status.lock().unwrap().clone())
71 }
72
73 fn report_load(&self, _metrics: &[mssf_core::types::LoadMetric]) -> mssf_core::Result<()> {
74 Ok(())
75 }
76
77 fn report_fault(&self, _fault_type: mssf_core::types::FaultType) -> mssf_core::Result<()> {
78 Ok(())
79 }
80
81 fn report_move_cost(&self, _move_cost: mssf_core::types::MoveCost) -> mssf_core::Result<()> {
82 Ok(())
83 }
84
85 fn report_partition_health(
86 &self,
87 _healthinfo: &mssf_core::types::HealthInformation,
88 ) -> mssf_core::Result<()> {
89 Ok(())
90 }
91
92 fn report_replica_health(
93 &self,
94 _healthinfo: &mssf_core::types::HealthInformation,
95 ) -> mssf_core::Result<()> {
96 Ok(())
97 }
98
99 fn try_get_com(
100 &self,
101 ) -> mssf_core::Result<&mssf_com::FabricRuntime::IFabricStatefulServicePartition> {
102 Err(mssf_core::ErrorCode::FABRIC_E_OPERATION_NOT_SUPPORTED.into())
103 }
104}
105
106#[derive(Clone)]
107pub struct CreateStatefulServicePartitionArg {
108 pub partition_id: GUID,
109 pub replica_count: usize,
110 pub init_data: Vec<u8>,
111 pub service_name: Uri,
112 pub service_type_name: WString,
113}
114
115pub struct StatefulServicePartitionDriver {
117 factory_index: i64,
119 service_factory: Vec<Box<dyn mssf_core::runtime::IStatefulServiceFactory>>,
120 replica_index: i64,
121 epoch_index: Epoch, partition_state: PartitionState,
123}
124
125struct PartitionState {
126 pub replica_states: HashMap<i64, StatefulServiceReplicaState>,
128 pub primary_index: i64,
129 pub epoch: Epoch,
130 pub static_info: Option<CreateStatefulServicePartitionArg>, pub current_configuration: mssf_core::types::ReplicaSetConfig,
133}
134
135struct StatefulServiceReplicaState {
136 pub replica: Box<dyn mssf_core::runtime::IStatefulServiceReplica>,
137 pub replicator: Box<dyn mssf_core::runtime::IPrimaryReplicator>,
138 pub partition: StatefulServicePartitionMock,
139 pub factory_index: i64, pub _replica_address: WString,
141 pub _replicator_address: WString,
142}
143
144impl Default for StatefulServicePartitionDriver {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150impl StatefulServicePartitionDriver {
151 pub fn new() -> Self {
152 Self {
153 service_factory: Vec::new(),
154 factory_index: 0,
155 replica_index: 1, epoch_index: Epoch {
157 data_loss_number: 0,
158 configuration_number: 1,
159 },
160 partition_state: PartitionState {
161 replica_states: HashMap::new(),
162 primary_index: 1, epoch: Epoch {
164 data_loss_number: 0,
165 configuration_number: 1,
166 },
167 static_info: None,
168 current_configuration: mssf_core::types::ReplicaSetConfig {
169 replicas: vec![],
170 write_quorum: 0,
171 },
172 },
173 }
174 }
175
176 pub fn register_service_factory(
180 &mut self,
181 factory: Box<dyn mssf_core::runtime::IStatefulServiceFactory>,
182 ) {
183 self.service_factory.push(factory);
184 }
185
186 fn get_round_robin_factory(
191 &mut self,
192 ) -> (i64, &dyn mssf_core::runtime::IStatefulServiceFactory) {
193 assert!(!self.service_factory.is_empty());
194 let idx = self.factory_index as usize % self.service_factory.len();
195 self.factory_index += 1;
196 (idx as i64, &*self.service_factory[idx])
197 }
198
199 fn next_replica_index(&mut self) -> i64 {
200 let idx = self.replica_index;
201 self.replica_index += 1;
202 idx
203 }
204
205 fn next_epoch_index(&mut self) -> Epoch {
206 let idx = self.epoch_index.clone();
207 self.epoch_index.configuration_number += 1;
208 idx
209 }
210
211 fn get_primary_state(&self) -> mssf_core::Result<&StatefulServiceReplicaState> {
212 let state = self
213 .partition_state
214 .replica_states
215 .get(&self.partition_state.primary_index)
216 .ok_or_else(|| {
217 mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
218 })?;
219 Ok(state)
220 }
221
222 fn check_partition_state(&self) {
225 if self.partition_state.replica_states.is_empty() {
226 assert!(self.partition_state.static_info.is_none());
227 assert_eq!(self.partition_state.current_configuration.replicas.len(), 0);
228 assert_eq!(self.partition_state.current_configuration.write_quorum, 0);
229 return;
230 }
231 self.get_primary_state().unwrap();
233 let expected_quorum = (self.partition_state.replica_states.len() as u32) / 2 + 1;
235 assert_eq!(
236 self.partition_state.current_configuration.write_quorum,
237 expected_quorum
238 );
239 }
240}
241
242impl StatefulServicePartitionDriver {
244 pub fn get_primary_replica_id(&self) -> i64 {
246 self.partition_state.primary_index
247 }
248 pub fn get_replica(
250 &self,
251 replica_id: i64,
252 ) -> Option<&dyn mssf_core::runtime::IStatefulServiceReplica> {
253 let state = self.partition_state.replica_states.get(&replica_id);
254 state.map(|s| s.replica.as_ref())
255 }
256 pub fn get_replicator(
258 &self,
259 replica_id: i64,
260 ) -> Option<&dyn mssf_core::runtime::IPrimaryReplicator> {
261 let state = self.partition_state.replica_states.get(&replica_id);
262 state.map(|s| s.replicator.as_ref())
263 }
264 pub fn list_replica_ids(&self) -> Vec<i64> {
266 self.partition_state
267 .replica_states
268 .keys()
269 .cloned()
270 .collect()
271 }
272}
273
274impl StatefulServicePartitionDriver {
276 pub async fn create_service_partition(
280 &mut self,
281 desc: &CreateStatefulServicePartitionArg,
282 ) -> mssf_core::Result<()> {
283 assert!(desc.replica_count > 0);
284 assert!(self.partition_state.replica_states.is_empty());
285
286 let mut replicas = HashMap::new();
287 let mut replicators = HashMap::new();
288 let mut replica_addresses = HashMap::new();
289 let mut replicator_addresses = HashMap::new();
290 let mut replica_infos = HashMap::new();
291 let mut partitions = HashMap::new();
292
293 for _ in 0..desc.replica_count {
294 let id = self.next_replica_index();
295 let (factory_index, factory) = self.get_round_robin_factory();
296 let replica = factory
297 .create_replica(
298 desc.service_type_name.clone(),
299 desc.service_name.clone(),
300 &desc.init_data,
301 desc.partition_id,
302 id,
303 )
304 .inspect_err(|e| {
305 tracing::error!("Failed to create stateful service replica: {:?}", e)
306 })?;
307 let prev = replicas.insert(id, (factory_index, replica));
308 assert!(prev.is_none(), "Service replica already exists");
309 }
310
311 for (id, (_, replica)) in &replicas {
313 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
314 let partition =
316 StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
317 mssf_core::types::SingletonPartitionInformation {
318 id: desc.partition_id,
319 },
320 ));
321 let replctr = replica
322 .open(
323 mssf_core::types::OpenMode::New,
324 Arc::new(partition.clone()),
325 cancellation_token,
326 )
327 .await?;
328 replicators.insert(*id, replctr);
329 partitions.insert(*id, partition);
330 }
331
332 for (id, replctr) in &replicators {
334 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
335
336 let replctr_addr = replctr.open(cancellation_token).await?;
337 replicator_addresses.insert(*id, replctr_addr);
338 }
339
340 let primary_index = 1;
342 let epoch = self.next_epoch_index();
343 for (id, rplctr) in &replicators {
344 let epoch_cp = epoch.clone();
345 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
346 if *id == primary_index {
347 rplctr
348 .change_role(
349 epoch_cp,
350 mssf_core::types::ReplicaRole::Primary,
351 cancellation_token,
352 )
353 .await?;
354 self.partition_state.primary_index = primary_index;
355 } else {
356 rplctr
357 .change_role(
358 epoch_cp,
359 mssf_core::types::ReplicaRole::IdleSecondary,
360 cancellation_token,
361 )
362 .await?;
363 }
364 }
365
366 for (id, (_, replica)) in &replicas {
368 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
369 let replica_addr = if *id == self.partition_state.primary_index {
370 replica
371 .change_role(mssf_core::types::ReplicaRole::Primary, cancellation_token)
372 .await?
373 } else {
374 replica
375 .change_role(
376 mssf_core::types::ReplicaRole::IdleSecondary,
377 cancellation_token,
378 )
379 .await?
380 };
381 replica_addresses.insert(*id, replica_addr);
382 }
383
384 let primary = replicators
386 .get(&self.partition_state.primary_index)
387 .unwrap();
388 for (id, (_, replica)) in &replicas {
389 if *id == self.partition_state.primary_index {
390 let replica_info = mssf_core::types::ReplicaInformation {
391 replicator_address: replicator_addresses.get(id).unwrap().clone(),
392 id: *id,
393 role: mssf_core::types::ReplicaRole::Primary,
394 status: mssf_core::types::ReplicaStatus::Up,
395 current_progress: -1, catch_up_capability: -1,
397 must_catch_up: false,
398 };
399 replica_infos.insert(*id, replica_info);
400 continue;
401 }
402 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
403 let replica_info = mssf_core::types::ReplicaInformation {
404 replicator_address: replicator_addresses.get(id).unwrap().clone(),
405 id: *id,
406 role: mssf_core::types::ReplicaRole::IdleSecondary,
407 status: mssf_core::types::ReplicaStatus::Up,
408 current_progress: -1,
409 catch_up_capability: -1,
410 must_catch_up: false,
411 };
412 replica_infos.insert(*id, replica_info.clone());
413 primary
414 .build_replica(replica_info, cancellation_token)
415 .await?;
416 replica
418 .change_role(
419 mssf_core::types::ReplicaRole::ActiveSecondary,
420 SimpleCancelToken::new_boxed(),
421 )
422 .await?;
423 replica_infos.get_mut(id).unwrap().role =
425 mssf_core::types::ReplicaRole::ActiveSecondary;
426 }
427
428 let mut new_config = mssf_core::types::ReplicaSetConfig {
430 replicas: vec![],
431 write_quorum: 1, };
433 self.partition_state.current_configuration = new_config.clone();
435 let mut ready_replicas = 1;
436 for id in replicas.keys() {
437 if *id == self.partition_state.primary_index {
438 continue;
439 }
440 let prev_config = new_config.clone();
441 let replica_info = replica_infos.get(id).unwrap().clone();
443 new_config.replicas.push(replica_info);
444 ready_replicas += 1;
445 new_config.write_quorum = ready_replicas / 2 + 1_u32;
446
447 primary.update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
448
449 primary
451 .wait_for_catch_up_quorum(
452 mssf_core::types::ReplicaSetQuorumMode::Write,
453 SimpleCancelToken::new_boxed(),
454 )
455 .await?;
456 primary.update_current_replica_set_configuration(new_config.clone())?;
458 self.partition_state.current_configuration = new_config.clone();
459 }
460
461 for (id, partition) in &partitions {
466 if *id == self.partition_state.primary_index {
467 partition.set_read_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
468 partition.set_write_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
469 } else {
470 partition
471 .set_read_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
472 partition
473 .set_write_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
474 }
475 }
476
477 for (id, (factory_index, replica)) in replicas {
479 let state = StatefulServiceReplicaState {
480 replica,
481 replicator: replicators.remove(&id).unwrap(),
482 _replica_address: replica_addresses.remove(&id).unwrap(),
483 _replicator_address: replicator_addresses.remove(&id).unwrap(),
484 partition: partitions.remove(&id).unwrap(),
485 factory_index,
486 };
487 self.partition_state.replica_states.insert(id, state);
488 }
489 self.partition_state.epoch = epoch;
490 self.partition_state.static_info = Some(desc.clone());
491
492 self.check_partition_state();
493 Ok(())
494 }
495
496 pub async fn delete_service_partition(&mut self) -> mssf_core::Result<()> {
498 for state in self.partition_state.replica_states.values_mut() {
502 state.partition.set_read_status(
503 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
504 );
505 state.partition.set_write_status(
506 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
507 );
508 }
509
510 let primary = self
512 .partition_state
513 .replica_states
514 .get_mut(&1)
515 .expect("Primary replica not found");
516
517 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
518 primary
519 .replica
520 .change_role(
521 mssf_core::types::ReplicaRole::ActiveSecondary,
522 cancellation_token,
523 )
524 .await?;
525 primary
526 .replicator
527 .change_role(
528 self.partition_state.epoch.clone(), mssf_core::types::ReplicaRole::ActiveSecondary,
530 SimpleCancelToken::new_boxed(),
531 )
532 .await?;
533
534 for state in self.partition_state.replica_states.values_mut() {
536 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
537 state
538 .replica
539 .change_role(mssf_core::types::ReplicaRole::None, cancellation_token)
540 .await?;
541 state
542 .replicator
543 .change_role(
544 self.partition_state.epoch.clone(), mssf_core::types::ReplicaRole::None,
546 SimpleCancelToken::new_boxed(),
547 )
548 .await?;
549 }
550
551 for state in self.partition_state.replica_states.values_mut() {
553 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
554 state.replica.close(cancellation_token.clone()).await?;
555 state.replicator.close(cancellation_token).await?;
556 }
557
558 self.partition_state.replica_states.clear();
560 self.partition_state.static_info = None;
561 self.partition_state.current_configuration = mssf_core::types::ReplicaSetConfig {
562 replicas: vec![],
563 write_quorum: 0,
564 };
565 self.check_partition_state();
566 Ok(())
567 }
568
569 pub async fn restart_secondary_graceful(&mut self, replica_id: i64) -> mssf_core::Result<()> {
571 {
573 self.partition_state
574 .replica_states
575 .get_mut(&replica_id)
576 .ok_or_else(|| {
577 mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
578 })?;
579 if replica_id == self.partition_state.primary_index {
581 tracing::error!(
582 "Replica {} is primary, cannot restart as secondary",
583 replica_id
584 );
585 return Err(mssf_core::Error::from(
586 mssf_core::ErrorCode::FABRIC_E_INVALID_OPERATION,
587 ));
588 }
589 }
590
591 {
593 let primary = self.get_primary_state().unwrap();
594 let current_config = self.partition_state.current_configuration.clone();
595 let replica_count = current_config.replicas.len();
596 let new_replicas = current_config
597 .replicas
598 .iter()
599 .filter(|r| r.id != replica_id)
600 .cloned()
601 .collect::<Vec<_>>();
602 let write_quorum = (replica_count as u32) / 2 + 1; let new_config = mssf_core::types::ReplicaSetConfig {
604 replicas: new_replicas,
605 write_quorum,
606 };
607 primary
608 .replicator
609 .update_current_replica_set_configuration(new_config.clone())?;
610 self.partition_state.current_configuration = new_config;
611 }
612
613 let prev_state = self
614 .partition_state
615 .replica_states
616 .remove(&replica_id)
617 .unwrap();
618 let factory_index = prev_state.factory_index;
619 {
621 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
622 prev_state.replica.close(cancellation_token.clone()).await?;
623 prev_state.replicator.close(cancellation_token).await?;
624 drop(prev_state);
625 }
626
627 let factory = &*self.service_factory[factory_index as usize];
629 let replica = factory
630 .create_replica(
631 self.partition_state
632 .static_info
633 .as_ref()
634 .unwrap()
635 .service_type_name
636 .clone(),
637 self.partition_state
638 .static_info
639 .as_ref()
640 .unwrap()
641 .service_name
642 .clone(),
643 &self.partition_state.static_info.as_ref().unwrap().init_data,
644 self.partition_state
645 .static_info
646 .as_ref()
647 .unwrap()
648 .partition_id,
649 replica_id,
650 )
651 .inspect_err(|e| {
652 tracing::error!("Failed to create stateful service replica: {:?}", e)
653 })?;
654 let partition = StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
656 mssf_core::types::SingletonPartitionInformation {
657 id: self
658 .partition_state
659 .static_info
660 .as_ref()
661 .unwrap()
662 .partition_id,
663 },
664 ));
665 let replctr = replica
667 .open(
668 mssf_core::types::OpenMode::Existing,
669 Arc::new(partition.clone()),
670 SimpleCancelToken::new_boxed(),
671 )
672 .await
673 .inspect_err(|e| tracing::error!("Fail to open replica {}", e))?;
674 let replctr_addr = replctr.open(SimpleCancelToken::new_boxed()).await?;
676 replctr
678 .change_role(
679 self.partition_state.epoch.clone(),
680 mssf_core::types::ReplicaRole::IdleSecondary,
681 SimpleCancelToken::new_boxed(),
682 )
683 .await?;
684 let replica_addr = replica
685 .change_role(
686 mssf_core::types::ReplicaRole::IdleSecondary,
687 SimpleCancelToken::new_boxed(),
688 )
689 .await?;
690
691 let primary = self.get_primary_state().unwrap();
693
694 let replica_info = mssf_core::types::ReplicaInformation {
695 replicator_address: replctr_addr.clone(),
696 id: replica_id,
697 role: mssf_core::types::ReplicaRole::IdleSecondary,
698 status: mssf_core::types::ReplicaStatus::Up,
699 current_progress: -1, catch_up_capability: -1,
701 must_catch_up: false,
702 };
703 primary
704 .replicator
705 .build_replica(replica_info.clone(), SimpleCancelToken::new_boxed())
706 .await?;
707
708 replica
710 .change_role(
711 mssf_core::types::ReplicaRole::ActiveSecondary,
712 SimpleCancelToken::new_boxed(),
713 )
714 .await?;
715 let mut updated_replica_info = replica_info.clone();
717 updated_replica_info.role = mssf_core::types::ReplicaRole::ActiveSecondary;
718 let prev_config = self.partition_state.current_configuration.clone();
720 let mut new_config_replicas = prev_config.replicas.clone();
721 new_config_replicas.push(updated_replica_info.clone());
722
723 let total_replica_count = new_config_replicas.len() + 1; let write_quorum = (total_replica_count as u32) / 2 + 1;
725 let new_config = mssf_core::types::ReplicaSetConfig {
726 replicas: new_config_replicas,
727 write_quorum,
728 };
729 primary
730 .replicator
731 .update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
732 primary
734 .replicator
735 .wait_for_catch_up_quorum(
736 mssf_core::types::ReplicaSetQuorumMode::Write,
737 SimpleCancelToken::new_boxed(),
738 )
739 .await?;
740 primary
742 .replicator
743 .update_current_replica_set_configuration(new_config.clone())?;
744 self.partition_state.current_configuration = new_config;
745 let state = StatefulServiceReplicaState {
747 replica,
748 replicator: replctr,
749 _replica_address: replica_addr,
750 _replicator_address: replctr_addr,
751 partition,
752 factory_index,
753 };
754 let prev = self
755 .partition_state
756 .replica_states
757 .insert(replica_id, state);
758 assert!(prev.is_none(), "Service replica already exists");
759 self.check_partition_state();
761 Ok(())
762 }
763}