1use std::{
7 collections::{BTreeMap, HashMap},
8 sync::{Arc, Mutex},
9};
10
11use mssf_core::{
12 GUID, WString,
13 runtime::IStatefulServicePartition,
14 sync::SimpleCancelToken,
15 types::{Epoch, ServicePartitionInformation, Uri},
16};
17
18#[derive(Clone)]
19pub struct StatefulServicePartitionMock {
20 info: mssf_core::types::ServicePartitionInformation,
21 read_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
22 write_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
23}
24
25impl StatefulServicePartitionMock {
26 pub fn new(info: mssf_core::types::ServicePartitionInformation) -> Self {
27 Self {
28 info,
29 read_status: Arc::new(Mutex::new(
30 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
31 )),
32 write_status: Arc::new(Mutex::new(
33 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
34 )),
35 }
36 }
37 pub fn new_boxed(
38 info: mssf_core::types::ServicePartitionInformation,
39 ) -> Box<dyn IStatefulServicePartition> {
40 Box::new(Self::new(info))
41 }
42 pub fn set_read_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
43 *self.read_status.lock().unwrap() = status;
44 }
45 pub fn set_write_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
46 *self.write_status.lock().unwrap() = status;
47 }
48}
49
50impl IStatefulServicePartition for StatefulServicePartitionMock {
51 fn create_replicator(
52 &self,
53 ) -> mssf_core::Result<Box<dyn mssf_core::runtime::IPrimaryReplicator>> {
54 unimplemented!("Not implemented")
55 }
56
57 fn get_partition_information(
58 &self,
59 ) -> mssf_core::Result<mssf_core::types::ServicePartitionInformation> {
60 Ok(self.info.clone())
61 }
62
63 fn get_read_status(&self) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
64 Ok(*self.read_status.lock().unwrap())
65 }
66
67 fn get_write_status(
68 &self,
69 ) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
70 Ok(*self.write_status.lock().unwrap())
71 }
72
73 fn report_load(&self, _metrics: &[mssf_core::types::LoadMetric]) -> mssf_core::Result<()> {
74 Ok(())
75 }
76
77 fn report_fault(&self, _fault_type: mssf_core::types::FaultType) -> mssf_core::Result<()> {
78 Ok(())
79 }
80
81 fn report_move_cost(&self, _move_cost: mssf_core::types::MoveCost) -> mssf_core::Result<()> {
82 Ok(())
83 }
84
85 fn report_partition_health(
86 &self,
87 _healthinfo: &mssf_core::types::HealthInformation,
88 ) -> mssf_core::Result<()> {
89 Ok(())
90 }
91
92 fn report_replica_health(
93 &self,
94 _healthinfo: &mssf_core::types::HealthInformation,
95 ) -> mssf_core::Result<()> {
96 Ok(())
97 }
98
99 fn try_get_com(
100 &self,
101 ) -> mssf_core::Result<&mssf_com::FabricRuntime::IFabricStatefulServicePartition> {
102 Err(mssf_core::ErrorCode::FABRIC_E_OPERATION_NOT_SUPPORTED.into())
103 }
104}
105
106#[derive(Clone)]
107pub struct CreateStatefulServicePartitionArg {
108 pub partition_id: GUID,
109 pub replica_count: usize,
110 pub init_data: Vec<u8>,
111 pub service_name: Uri,
112 pub service_type_name: WString,
113}
114
115pub struct StatefulServicePartitionDriver {
117 factory_index: i64,
119 service_factory: Vec<Box<dyn mssf_core::runtime::IStatefulServiceFactory>>,
120 replica_index: i64,
121 epoch_index: Epoch, partition_state: PartitionState,
123}
124
125struct PartitionState {
126 pub replica_states: HashMap<i64, StatefulServiceReplicaState>,
128 pub primary_index: i64,
129 pub epoch: Epoch,
130 pub static_info: Option<CreateStatefulServicePartitionArg>, pub current_configuration: mssf_core::types::ReplicaSetConfig,
133}
134
135struct StatefulServiceReplicaState {
136 pub replica: Box<dyn mssf_core::runtime::IStatefulServiceReplica>,
137 pub replicator: Box<dyn mssf_core::runtime::IPrimaryReplicator>,
138 pub partition: StatefulServicePartitionMock,
139 pub factory_index: i64, pub _replica_address: WString,
141 pub _replicator_address: WString,
142}
143
144impl Default for StatefulServicePartitionDriver {
145 fn default() -> Self {
146 Self::new()
147 }
148}
149
150impl StatefulServicePartitionDriver {
151 pub fn new() -> Self {
152 Self {
153 service_factory: Vec::new(),
154 factory_index: 0,
155 replica_index: 1, epoch_index: Epoch {
157 data_loss_number: 0,
158 configuration_number: 1,
159 },
160 partition_state: PartitionState {
161 replica_states: HashMap::new(),
162 primary_index: 1, epoch: Epoch {
164 data_loss_number: 0,
165 configuration_number: 1,
166 },
167 static_info: None,
168 current_configuration: mssf_core::types::ReplicaSetConfig {
169 replicas: vec![],
170 write_quorum: 0,
171 },
172 },
173 }
174 }
175
176 pub fn register_service_factory(
180 &mut self,
181 factory: Box<dyn mssf_core::runtime::IStatefulServiceFactory>,
182 ) {
183 self.service_factory.push(factory);
184 }
185
186 fn get_round_robin_factory(
191 &mut self,
192 ) -> (i64, &dyn mssf_core::runtime::IStatefulServiceFactory) {
193 assert!(!self.service_factory.is_empty());
194 let idx = self.factory_index as usize % self.service_factory.len();
195 self.factory_index += 1;
196 (idx as i64, &*self.service_factory[idx])
197 }
198
199 fn next_replica_index(&mut self) -> i64 {
200 let idx = self.replica_index;
201 self.replica_index += 1;
202 idx
203 }
204
205 fn next_epoch_index(&mut self) -> Epoch {
206 let idx = self.epoch_index.clone();
207 self.epoch_index.configuration_number += 1;
208 idx
209 }
210
211 fn get_primary_state(&self) -> mssf_core::Result<&StatefulServiceReplicaState> {
212 let state = self
213 .partition_state
214 .replica_states
215 .get(&self.partition_state.primary_index)
216 .ok_or_else(|| {
217 mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
218 })?;
219 Ok(state)
220 }
221
222 fn check_partition_state(&self) {
225 if self.partition_state.replica_states.is_empty() {
226 assert!(self.partition_state.static_info.is_none());
227 assert_eq!(self.partition_state.current_configuration.replicas.len(), 0);
228 assert_eq!(self.partition_state.current_configuration.write_quorum, 0);
229 return;
230 }
231 self.get_primary_state().unwrap();
233 let expected_quorum = (self.partition_state.replica_states.len() as u32) / 2 + 1;
235 assert_eq!(
236 self.partition_state.current_configuration.write_quorum,
237 expected_quorum
238 );
239 }
240}
241
242impl StatefulServicePartitionDriver {
244 pub fn get_primary_replica_id(&self) -> i64 {
246 self.partition_state.primary_index
247 }
248 pub fn get_replica(
250 &self,
251 replica_id: i64,
252 ) -> Option<&dyn mssf_core::runtime::IStatefulServiceReplica> {
253 let state = self.partition_state.replica_states.get(&replica_id);
254 state.map(|s| s.replica.as_ref())
255 }
256 pub fn get_replicator(
258 &self,
259 replica_id: i64,
260 ) -> Option<&dyn mssf_core::runtime::IPrimaryReplicator> {
261 let state = self.partition_state.replica_states.get(&replica_id);
262 state.map(|s| s.replicator.as_ref())
263 }
264 pub fn list_replica_ids(&self) -> Vec<i64> {
266 self.partition_state
267 .replica_states
268 .keys()
269 .cloned()
270 .collect()
271 }
272}
273
274impl StatefulServicePartitionDriver {
276 pub async fn create_service_partition(
280 &mut self,
281 desc: &CreateStatefulServicePartitionArg,
282 ) -> mssf_core::Result<()> {
283 assert!(desc.replica_count > 0);
284 assert!(self.partition_state.replica_states.is_empty());
285
286 let mut replicas = BTreeMap::new();
287 let mut replicators = BTreeMap::new();
288 let mut replica_addresses = BTreeMap::new();
289 let mut replicator_addresses = BTreeMap::new();
290 let mut replica_infos = BTreeMap::new();
291 let mut partitions = BTreeMap::new();
292
293 for _ in 0..desc.replica_count {
294 let id = self.next_replica_index();
295 let (factory_index, factory) = self.get_round_robin_factory();
296 let replica = factory
297 .create_replica(
298 desc.service_type_name.clone(),
299 desc.service_name.clone(),
300 &desc.init_data,
301 desc.partition_id,
302 id,
303 )
304 .inspect_err(|e| {
305 tracing::error!("Failed to create stateful service replica: {:?}", e)
306 })?;
307 let prev = replicas.insert(id, (factory_index, replica));
308 assert!(prev.is_none(), "Service replica already exists");
309 }
310
311 for (id, (_, replica)) in &replicas {
313 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
314 let partition =
316 StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
317 mssf_core::types::SingletonPartitionInformation {
318 id: desc.partition_id,
319 },
320 ));
321 let replctr = replica
322 .open(
323 mssf_core::types::OpenMode::New,
324 Arc::new(partition.clone()),
325 cancellation_token,
326 )
327 .await?;
328 replicators.insert(*id, replctr);
329 partitions.insert(*id, partition);
330 }
331
332 for (id, replctr) in &replicators {
334 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
335
336 let replctr_addr = replctr.open(cancellation_token).await?;
337 replicator_addresses.insert(*id, replctr_addr);
338 }
339
340 let primary_index = 1;
342 let epoch = self.next_epoch_index();
343 for (id, rplctr) in &replicators {
344 let epoch_cp = epoch.clone();
345 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
346 if *id == primary_index {
347 rplctr
348 .change_role(
349 epoch_cp,
350 mssf_core::types::ReplicaRole::Primary,
351 cancellation_token,
352 )
353 .await?;
354 self.partition_state.primary_index = primary_index;
355 } else {
356 rplctr
357 .change_role(
358 epoch_cp,
359 mssf_core::types::ReplicaRole::IdleSecondary,
360 cancellation_token,
361 )
362 .await?;
363 }
364 }
365
366 for (id, (_, replica)) in &replicas {
368 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
369 let replica_addr = if *id == self.partition_state.primary_index {
370 replica
371 .change_role(mssf_core::types::ReplicaRole::Primary, cancellation_token)
372 .await?
373 } else {
374 replica
375 .change_role(
376 mssf_core::types::ReplicaRole::IdleSecondary,
377 cancellation_token,
378 )
379 .await?
380 };
381 replica_addresses.insert(*id, replica_addr);
382 }
383
384 let primary = replicators
386 .get(&self.partition_state.primary_index)
387 .unwrap();
388 for (id, (_, replica)) in &replicas {
389 if *id == self.partition_state.primary_index {
390 let replica_info = mssf_core::types::ReplicaInformation {
391 replicator_address: replicator_addresses.get(id).unwrap().clone(),
392 id: *id,
393 role: mssf_core::types::ReplicaRole::Primary,
394 status: mssf_core::types::ReplicaStatus::Up,
395 current_progress: -1, catch_up_capability: -1,
397 must_catch_up: false,
398 };
399 replica_infos.insert(*id, replica_info);
400 continue;
401 }
402 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
403 let replica_info = mssf_core::types::ReplicaInformation {
404 replicator_address: replicator_addresses.get(id).unwrap().clone(),
405 id: *id,
406 role: mssf_core::types::ReplicaRole::IdleSecondary,
407 status: mssf_core::types::ReplicaStatus::Up,
408 current_progress: -1,
409 catch_up_capability: -1,
410 must_catch_up: false,
411 };
412 replica_infos.insert(*id, replica_info.clone());
413 primary
414 .build_replica(replica_info, cancellation_token)
415 .await?;
416 let rplctr = replicators.get(id).unwrap();
418 rplctr
419 .change_role(
420 epoch.clone(),
421 mssf_core::types::ReplicaRole::ActiveSecondary,
422 SimpleCancelToken::new_boxed(),
423 )
424 .await?;
425 replica
427 .change_role(
428 mssf_core::types::ReplicaRole::ActiveSecondary,
429 SimpleCancelToken::new_boxed(),
430 )
431 .await?;
432 replica_infos.get_mut(id).unwrap().role =
434 mssf_core::types::ReplicaRole::ActiveSecondary;
435 }
436
437 let mut new_config = mssf_core::types::ReplicaSetConfig {
439 replicas: vec![],
440 write_quorum: 1, };
442 self.partition_state.current_configuration = new_config.clone();
444 let mut ready_replicas = 1;
445 for id in replicas.keys() {
446 if *id == self.partition_state.primary_index {
447 continue;
448 }
449 let prev_config = new_config.clone();
450 let replica_info = replica_infos.get(id).unwrap().clone();
452 new_config.replicas.push(replica_info);
453 ready_replicas += 1;
454 new_config.write_quorum = ready_replicas / 2 + 1_u32;
455
456 primary.update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
457
458 primary
460 .wait_for_catch_up_quorum(
461 mssf_core::types::ReplicaSetQuorumMode::Write,
462 SimpleCancelToken::new_boxed(),
463 )
464 .await?;
465 primary.update_current_replica_set_configuration(new_config.clone())?;
467 self.partition_state.current_configuration = new_config.clone();
468 }
469
470 for (id, partition) in &partitions {
475 if *id == self.partition_state.primary_index {
476 partition.set_read_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
477 partition.set_write_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
478 } else {
479 partition
480 .set_read_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
481 partition
482 .set_write_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
483 }
484 }
485
486 for (id, (factory_index, replica)) in replicas {
488 let state = StatefulServiceReplicaState {
489 replica,
490 replicator: replicators.remove(&id).unwrap(),
491 _replica_address: replica_addresses.remove(&id).unwrap(),
492 _replicator_address: replicator_addresses.remove(&id).unwrap(),
493 partition: partitions.remove(&id).unwrap(),
494 factory_index,
495 };
496 self.partition_state.replica_states.insert(id, state);
497 }
498 self.partition_state.epoch = epoch;
499 self.partition_state.static_info = Some(desc.clone());
500
501 self.check_partition_state();
502 Ok(())
503 }
504
505 pub async fn delete_service_partition(&mut self) -> mssf_core::Result<()> {
507 for state in self.partition_state.replica_states.values_mut() {
511 state.partition.set_read_status(
512 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
513 );
514 state.partition.set_write_status(
515 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
516 );
517 }
518
519 let primary = self
521 .partition_state
522 .replica_states
523 .get_mut(&self.partition_state.primary_index)
524 .expect("Primary replica not found");
525
526 primary
528 .replicator
529 .change_role(
530 self.partition_state.epoch.clone(), mssf_core::types::ReplicaRole::ActiveSecondary,
532 SimpleCancelToken::new_boxed(),
533 )
534 .await?;
535 primary
536 .replica
537 .change_role(
538 mssf_core::types::ReplicaRole::ActiveSecondary,
539 SimpleCancelToken::new_boxed(),
540 )
541 .await?;
542
543 for state in self.partition_state.replica_states.values_mut() {
546 state
547 .replicator
548 .change_role(
549 self.partition_state.epoch.clone(), mssf_core::types::ReplicaRole::None,
551 SimpleCancelToken::new_boxed(),
552 )
553 .await?;
554 state
555 .replica
556 .change_role(
557 mssf_core::types::ReplicaRole::None,
558 SimpleCancelToken::new_boxed(),
559 )
560 .await?;
561 }
562
563 for state in self.partition_state.replica_states.values_mut() {
565 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
566 state.replica.close(cancellation_token.clone()).await?;
567 state.replicator.close(cancellation_token).await?;
568 }
569
570 self.partition_state.replica_states.clear();
572 self.partition_state.static_info = None;
573 self.partition_state.current_configuration = mssf_core::types::ReplicaSetConfig {
574 replicas: vec![],
575 write_quorum: 0,
576 };
577 self.check_partition_state();
578 Ok(())
579 }
580
581 pub async fn restart_secondary_graceful(&mut self, replica_id: i64) -> mssf_core::Result<()> {
583 {
585 self.partition_state
586 .replica_states
587 .get_mut(&replica_id)
588 .ok_or_else(|| {
589 mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
590 })?;
591 if replica_id == self.partition_state.primary_index {
593 tracing::error!(
594 "Replica {} is primary, cannot restart as secondary",
595 replica_id
596 );
597 return Err(mssf_core::Error::from(
598 mssf_core::ErrorCode::FABRIC_E_INVALID_OPERATION,
599 ));
600 }
601 }
602
603 {
605 let primary = self.get_primary_state().unwrap();
606 let current_config = self.partition_state.current_configuration.clone();
607 let replica_count = current_config.replicas.len();
608 let new_replicas = current_config
609 .replicas
610 .iter()
611 .filter(|r| r.id != replica_id)
612 .cloned()
613 .collect::<Vec<_>>();
614 let write_quorum = (replica_count as u32) / 2 + 1; let new_config = mssf_core::types::ReplicaSetConfig {
616 replicas: new_replicas,
617 write_quorum,
618 };
619 primary
620 .replicator
621 .update_current_replica_set_configuration(new_config.clone())?;
622 self.partition_state.current_configuration = new_config;
623 }
624
625 let prev_state = self
626 .partition_state
627 .replica_states
628 .remove(&replica_id)
629 .unwrap();
630 let factory_index = prev_state.factory_index;
631 {
633 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
634 prev_state
635 .replicator
636 .close(cancellation_token.clone())
637 .await?;
638 prev_state.replica.close(cancellation_token).await?;
639 drop(prev_state);
640 }
641
642 let factory = &*self.service_factory[factory_index as usize];
644 let replica = factory
645 .create_replica(
646 self.partition_state
647 .static_info
648 .as_ref()
649 .unwrap()
650 .service_type_name
651 .clone(),
652 self.partition_state
653 .static_info
654 .as_ref()
655 .unwrap()
656 .service_name
657 .clone(),
658 &self.partition_state.static_info.as_ref().unwrap().init_data,
659 self.partition_state
660 .static_info
661 .as_ref()
662 .unwrap()
663 .partition_id,
664 replica_id,
665 )
666 .inspect_err(|e| {
667 tracing::error!("Failed to create stateful service replica: {:?}", e)
668 })?;
669 let partition = StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
671 mssf_core::types::SingletonPartitionInformation {
672 id: self
673 .partition_state
674 .static_info
675 .as_ref()
676 .unwrap()
677 .partition_id,
678 },
679 ));
680 let replctr = replica
682 .open(
683 mssf_core::types::OpenMode::Existing,
684 Arc::new(partition.clone()),
685 SimpleCancelToken::new_boxed(),
686 )
687 .await
688 .inspect_err(|e| tracing::error!("Fail to open replica {}", e))?;
689 let replctr_addr = replctr.open(SimpleCancelToken::new_boxed()).await?;
691 replctr
693 .change_role(
694 self.partition_state.epoch.clone(),
695 mssf_core::types::ReplicaRole::IdleSecondary,
696 SimpleCancelToken::new_boxed(),
697 )
698 .await?;
699 let replica_addr = replica
700 .change_role(
701 mssf_core::types::ReplicaRole::IdleSecondary,
702 SimpleCancelToken::new_boxed(),
703 )
704 .await?;
705
706 let primary = self.get_primary_state().unwrap();
708
709 let replica_info = mssf_core::types::ReplicaInformation {
710 replicator_address: replctr_addr.clone(),
711 id: replica_id,
712 role: mssf_core::types::ReplicaRole::IdleSecondary,
713 status: mssf_core::types::ReplicaStatus::Up,
714 current_progress: -1, catch_up_capability: -1,
716 must_catch_up: false,
717 };
718 primary
719 .replicator
720 .build_replica(replica_info.clone(), SimpleCancelToken::new_boxed())
721 .await?;
722
723 replctr
726 .change_role(
727 self.partition_state.epoch.clone(),
728 mssf_core::types::ReplicaRole::ActiveSecondary,
729 SimpleCancelToken::new_boxed(),
730 )
731 .await?;
732 replica
733 .change_role(
734 mssf_core::types::ReplicaRole::ActiveSecondary,
735 SimpleCancelToken::new_boxed(),
736 )
737 .await?;
738 let mut updated_replica_info = replica_info.clone();
740 updated_replica_info.role = mssf_core::types::ReplicaRole::ActiveSecondary;
741 let prev_config = self.partition_state.current_configuration.clone();
743 let mut new_config_replicas = prev_config.replicas.clone();
744 new_config_replicas.push(updated_replica_info.clone());
745
746 let total_replica_count = new_config_replicas.len() + 1; let write_quorum = (total_replica_count as u32) / 2 + 1;
748 let new_config = mssf_core::types::ReplicaSetConfig {
749 replicas: new_config_replicas,
750 write_quorum,
751 };
752 primary
753 .replicator
754 .update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
755 primary
757 .replicator
758 .wait_for_catch_up_quorum(
759 mssf_core::types::ReplicaSetQuorumMode::Write,
760 SimpleCancelToken::new_boxed(),
761 )
762 .await?;
763 primary
765 .replicator
766 .update_current_replica_set_configuration(new_config.clone())?;
767 self.partition_state.current_configuration = new_config;
768 let state = StatefulServiceReplicaState {
770 replica,
771 replicator: replctr,
772 _replica_address: replica_addr,
773 _replicator_address: replctr_addr,
774 partition,
775 factory_index,
776 };
777 let prev = self
778 .partition_state
779 .replica_states
780 .insert(replica_id, state);
781 assert!(prev.is_none(), "Service replica already exists");
782 self.check_partition_state();
784 Ok(())
785 }
786}