1use std::{
7 collections::HashMap,
8 sync::{Arc, Mutex},
9};
10
11use mssf_core::{
12 GUID, WString,
13 runtime::IStatefulServicePartition,
14 sync::SimpleCancelToken,
15 types::{Epoch, ServicePartitionInformation, Uri},
16};
17
18#[derive(Clone)]
19pub struct StatefulServicePartitionMock {
20 info: mssf_core::types::ServicePartitionInformation,
21 read_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
22 write_status: Arc<Mutex<mssf_core::types::ServicePartitionAccessStatus>>,
23}
24
25impl StatefulServicePartitionMock {
26 pub fn new(info: mssf_core::types::ServicePartitionInformation) -> Self {
27 Self {
28 info,
29 read_status: Arc::new(Mutex::new(
30 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
31 )),
32 write_status: Arc::new(Mutex::new(
33 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
34 )),
35 }
36 }
37 pub fn new_boxed(
38 info: mssf_core::types::ServicePartitionInformation,
39 ) -> Box<dyn IStatefulServicePartition> {
40 Box::new(Self::new(info))
41 }
42 pub fn set_read_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
43 *self.read_status.lock().unwrap() = status;
44 }
45 pub fn set_write_status(&self, status: mssf_core::types::ServicePartitionAccessStatus) {
46 *self.write_status.lock().unwrap() = status;
47 }
48}
49
50impl IStatefulServicePartition for StatefulServicePartitionMock {
51 fn create_replicator(
52 &self,
53 ) -> mssf_core::Result<Box<dyn mssf_core::runtime::IPrimaryReplicator>> {
54 unimplemented!("Not implemented")
55 }
56
57 fn get_partition_information(
58 &self,
59 ) -> mssf_core::Result<mssf_core::types::ServicePartitionInformation> {
60 Ok(self.info.clone())
61 }
62
63 fn get_read_status(&self) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
64 Ok(self.read_status.lock().unwrap().clone())
65 }
66
67 fn get_write_status(
68 &self,
69 ) -> mssf_core::Result<mssf_core::types::ServicePartitionAccessStatus> {
70 Ok(self.write_status.lock().unwrap().clone())
71 }
72
73 fn report_load(&self, _metrics: &[mssf_core::types::LoadMetric]) -> mssf_core::Result<()> {
74 Ok(())
75 }
76
77 fn report_fault(&self, _fault_type: mssf_core::types::FaultType) -> mssf_core::Result<()> {
78 Ok(())
79 }
80
81 fn report_move_cost(&self, _move_cost: mssf_core::types::MoveCost) -> mssf_core::Result<()> {
82 Ok(())
83 }
84
85 fn report_partition_health(
86 &self,
87 _healthinfo: &mssf_core::types::HealthInformation,
88 ) -> mssf_core::Result<()> {
89 Ok(())
90 }
91
92 fn report_replica_health(
93 &self,
94 _healthinfo: &mssf_core::types::HealthInformation,
95 ) -> mssf_core::Result<()> {
96 Ok(())
97 }
98
99 fn try_get_com(
100 &self,
101 ) -> mssf_core::Result<&mssf_com::FabricRuntime::IFabricStatefulServicePartition> {
102 Err(mssf_core::ErrorCode::FABRIC_E_OPERATION_NOT_SUPPORTED.into())
103 }
104}
105
106#[derive(Clone)]
107pub struct CreateStatefulServicePartitionArg {
108 pub partition_id: GUID,
109 pub replica_count: usize,
110 pub init_data: Vec<u8>,
111 pub service_name: Uri,
112 pub service_type_name: WString,
113}
114
115pub struct StatefulServicePartitionDriver {
117 factory_index: i64,
119 service_factory: Vec<Box<dyn mssf_core::runtime::IStatefulServiceFactory>>,
120 replica_index: i64,
121 epoch_index: Epoch, partition_state: PartitionState,
123}
124
125struct PartitionState {
126 pub replica_states: HashMap<i64, StatefulServiceReplicaState>,
127 pub primary_index: i64,
128 pub epoch: Epoch,
129 pub static_info: Option<CreateStatefulServicePartitionArg>, pub current_configuration: mssf_core::types::ReplicaSetConfig,
131}
132
133struct StatefulServiceReplicaState {
134 pub replica: Box<dyn mssf_core::runtime::IStatefulServiceReplica>,
135 pub replicator: Box<dyn mssf_core::runtime::IPrimaryReplicator>,
136 pub partition: StatefulServicePartitionMock,
137 pub factory_index: i64, pub _replica_address: WString,
139 pub _replicator_address: WString,
140}
141
142impl Default for StatefulServicePartitionDriver {
143 fn default() -> Self {
144 Self::new()
145 }
146}
147
148impl StatefulServicePartitionDriver {
149 pub fn new() -> Self {
150 Self {
151 service_factory: Vec::new(),
152 factory_index: 0,
153 replica_index: 1, epoch_index: Epoch {
155 data_loss_number: 0,
156 configuration_number: 1,
157 },
158 partition_state: PartitionState {
159 replica_states: HashMap::new(),
160 primary_index: 1, epoch: Epoch {
162 data_loss_number: 0,
163 configuration_number: 1,
164 },
165 static_info: None,
166 current_configuration: mssf_core::types::ReplicaSetConfig {
167 replicas: vec![],
168 write_quorum: 0,
169 },
170 },
171 }
172 }
173
174 pub fn register_service_factory(
178 &mut self,
179 factory: Box<dyn mssf_core::runtime::IStatefulServiceFactory>,
180 ) {
181 self.service_factory.push(factory);
182 }
183
184 fn get_round_robin_factory(
189 &mut self,
190 ) -> (i64, &dyn mssf_core::runtime::IStatefulServiceFactory) {
191 assert!(!self.service_factory.is_empty());
192 let idx = self.factory_index as usize % self.service_factory.len();
193 self.factory_index += 1;
194 (idx as i64, &*self.service_factory[idx])
195 }
196
197 fn next_replica_index(&mut self) -> i64 {
198 let idx = self.replica_index;
199 self.replica_index += 1;
200 idx
201 }
202
203 fn next_epoch_index(&mut self) -> Epoch {
204 let idx = self.epoch_index.clone();
205 self.epoch_index.configuration_number += 1;
206 idx
207 }
208
209 fn get_primary_state(&self) -> mssf_core::Result<&StatefulServiceReplicaState> {
210 let state = self
211 .partition_state
212 .replica_states
213 .get(&self.partition_state.primary_index)
214 .ok_or_else(|| {
215 mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
216 })?;
217 Ok(state)
218 }
219
220 fn check_partition_state(&self) {
223 if self.partition_state.replica_states.is_empty() {
224 assert!(self.partition_state.static_info.is_none());
225 assert_eq!(self.partition_state.current_configuration.replicas.len(), 0);
226 assert_eq!(self.partition_state.current_configuration.write_quorum, 0);
227 return;
228 }
229 self.get_primary_state().unwrap();
231 let expected_quorum = (self.partition_state.replica_states.len() as u32) / 2 + 1;
233 assert_eq!(
234 self.partition_state.current_configuration.write_quorum,
235 expected_quorum
236 );
237 }
238}
239
240impl StatefulServicePartitionDriver {
242 pub fn get_primary_replica_id(&self) -> i64 {
244 self.partition_state.primary_index
245 }
246 pub fn get_replica(
248 &self,
249 replica_id: i64,
250 ) -> Option<&dyn mssf_core::runtime::IStatefulServiceReplica> {
251 let state = self.partition_state.replica_states.get(&replica_id);
252 state.map(|s| s.replica.as_ref())
253 }
254 pub fn get_replicator(
256 &self,
257 replica_id: i64,
258 ) -> Option<&dyn mssf_core::runtime::IPrimaryReplicator> {
259 let state = self.partition_state.replica_states.get(&replica_id);
260 state.map(|s| s.replicator.as_ref())
261 }
262 pub fn list_replica_ids(&self) -> Vec<i64> {
264 self.partition_state
265 .replica_states
266 .keys()
267 .cloned()
268 .collect()
269 }
270}
271
272impl StatefulServicePartitionDriver {
274 pub async fn create_service_partition(
278 &mut self,
279 desc: &CreateStatefulServicePartitionArg,
280 ) -> mssf_core::Result<()> {
281 assert!(desc.replica_count > 0);
282 assert!(self.partition_state.replica_states.is_empty());
283
284 let mut replicas = HashMap::new();
285 let mut replicators = HashMap::new();
286 let mut replica_addresses = HashMap::new();
287 let mut replicator_addresses = HashMap::new();
288 let mut replica_infos = HashMap::new();
289 let mut partitions = HashMap::new();
290
291 for _ in 0..desc.replica_count {
292 let id = self.next_replica_index();
293 let (factory_index, factory) = self.get_round_robin_factory();
294 let replica = factory
295 .create_replica(
296 desc.service_type_name.clone(),
297 desc.service_name.clone(),
298 &desc.init_data,
299 desc.partition_id,
300 id,
301 )
302 .inspect_err(|e| {
303 tracing::error!("Failed to create stateful service replica: {:?}", e)
304 })?;
305 let prev = replicas.insert(id, (factory_index, replica));
306 assert!(prev.is_none(), "Service replica already exists");
307 }
308
309 for (id, (_, replica)) in &replicas {
311 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
312 let partition =
314 StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
315 mssf_core::types::SingletonPartitionInformation {
316 id: desc.partition_id,
317 },
318 ));
319 let replctr = replica
320 .open(
321 mssf_core::types::OpenMode::New,
322 Arc::new(partition.clone()),
323 cancellation_token,
324 )
325 .await?;
326 replicators.insert(*id, replctr);
327 partitions.insert(*id, partition);
328 }
329
330 for (id, replctr) in &replicators {
332 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
333
334 let replctr_addr = replctr.open(cancellation_token).await?;
335 replicator_addresses.insert(*id, replctr_addr);
336 }
337
338 let primary_index = 1;
340 let epoch = self.next_epoch_index();
341 for (id, rplctr) in &replicators {
342 let epoch_cp = epoch.clone();
343 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
344 if *id == primary_index {
345 rplctr
346 .change_role(
347 epoch_cp,
348 mssf_core::types::ReplicaRole::Primary,
349 cancellation_token,
350 )
351 .await?;
352 self.partition_state.primary_index = primary_index;
353 } else {
354 rplctr
355 .change_role(
356 epoch_cp,
357 mssf_core::types::ReplicaRole::IdleSecondary,
358 cancellation_token,
359 )
360 .await?;
361 }
362 }
363
364 for (id, (_, replica)) in &replicas {
366 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
367 let replica_addr = if *id == self.partition_state.primary_index {
368 replica
369 .change_role(mssf_core::types::ReplicaRole::Primary, cancellation_token)
370 .await?
371 } else {
372 replica
373 .change_role(
374 mssf_core::types::ReplicaRole::IdleSecondary,
375 cancellation_token,
376 )
377 .await?
378 };
379 replica_addresses.insert(*id, replica_addr);
380 }
381
382 let primary = replicators
384 .get(&self.partition_state.primary_index)
385 .unwrap();
386 for (id, (_, replica)) in &replicas {
387 if *id == self.partition_state.primary_index {
388 let replica_info = mssf_core::types::ReplicaInformation {
389 replicator_address: replicator_addresses.get(id).unwrap().clone(),
390 id: *id,
391 role: mssf_core::types::ReplicaRole::Primary,
392 status: mssf_core::types::ReplicaStatus::Up,
393 current_progress: -1, catch_up_capability: -1,
395 must_catch_up: false,
396 };
397 replica_infos.insert(*id, replica_info);
398 continue;
399 }
400 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
401 let replica_info = mssf_core::types::ReplicaInformation {
402 replicator_address: replicator_addresses.get(id).unwrap().clone(),
403 id: *id,
404 role: mssf_core::types::ReplicaRole::IdleSecondary,
405 status: mssf_core::types::ReplicaStatus::Up,
406 current_progress: -1,
407 catch_up_capability: -1,
408 must_catch_up: false,
409 };
410 replica_infos.insert(*id, replica_info.clone());
411 primary
412 .build_replica(replica_info, cancellation_token)
413 .await?;
414 replica
416 .change_role(
417 mssf_core::types::ReplicaRole::ActiveSecondary,
418 SimpleCancelToken::new_boxed(),
419 )
420 .await?;
421 replica_infos.get_mut(id).unwrap().role =
423 mssf_core::types::ReplicaRole::ActiveSecondary;
424 }
425
426 let mut new_config = mssf_core::types::ReplicaSetConfig {
428 replicas: vec![],
429 write_quorum: 1, };
431 let mut ready_replicas = 1;
432 for id in replicas.keys() {
433 if *id == self.partition_state.primary_index {
434 continue;
435 }
436 let prev_config = new_config.clone();
437 let replica_info = replica_infos.get(id).unwrap().clone();
439 new_config.replicas.push(replica_info);
440 ready_replicas += 1;
441 new_config.write_quorum = ready_replicas / 2 + 1_u32;
442
443 primary.update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
444
445 primary
447 .wait_for_catch_up_quorum(
448 mssf_core::types::ReplicaSetQuorumMode::Write,
449 SimpleCancelToken::new_boxed(),
450 )
451 .await?;
452 primary.update_current_replica_set_configuration(new_config.clone())?;
454 self.partition_state.current_configuration = new_config.clone();
455 }
456
457 for (id, partition) in &partitions {
462 if *id == self.partition_state.primary_index {
463 partition.set_read_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
464 partition.set_write_status(mssf_core::types::ServicePartitionAccessStatus::Granted);
465 } else {
466 partition
467 .set_read_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
468 partition
469 .set_write_status(mssf_core::types::ServicePartitionAccessStatus::NotPrimary);
470 }
471 }
472
473 for (id, (factory_index, replica)) in replicas {
475 let state = StatefulServiceReplicaState {
476 replica,
477 replicator: replicators.remove(&id).unwrap(),
478 _replica_address: replica_addresses.remove(&id).unwrap(),
479 _replicator_address: replicator_addresses.remove(&id).unwrap(),
480 partition: partitions.remove(&id).unwrap(),
481 factory_index,
482 };
483 self.partition_state.replica_states.insert(id, state);
484 }
485 self.partition_state.epoch = epoch;
486 self.partition_state.static_info = Some(desc.clone());
487
488 self.check_partition_state();
489 Ok(())
490 }
491
492 pub async fn delete_service_partition(&mut self) -> mssf_core::Result<()> {
494 for state in self.partition_state.replica_states.values_mut() {
498 state.partition.set_read_status(
499 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
500 );
501 state.partition.set_write_status(
502 mssf_core::types::ServicePartitionAccessStatus::ReconfigurationPending,
503 );
504 }
505
506 let primary = self
508 .partition_state
509 .replica_states
510 .get_mut(&1)
511 .expect("Primary replica not found");
512
513 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
514 primary
515 .replica
516 .change_role(
517 mssf_core::types::ReplicaRole::ActiveSecondary,
518 cancellation_token,
519 )
520 .await?;
521 primary
522 .replicator
523 .change_role(
524 self.partition_state.epoch.clone(), mssf_core::types::ReplicaRole::ActiveSecondary,
526 SimpleCancelToken::new_boxed(),
527 )
528 .await?;
529
530 for state in self.partition_state.replica_states.values_mut() {
532 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
533 state
534 .replica
535 .change_role(mssf_core::types::ReplicaRole::None, cancellation_token)
536 .await?;
537 state
538 .replicator
539 .change_role(
540 self.partition_state.epoch.clone(), mssf_core::types::ReplicaRole::None,
542 SimpleCancelToken::new_boxed(),
543 )
544 .await?;
545 }
546
547 for state in self.partition_state.replica_states.values_mut() {
549 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
550 state.replica.close(cancellation_token.clone()).await?;
551 state.replicator.close(cancellation_token).await?;
552 }
553
554 self.partition_state.replica_states.clear();
556 self.partition_state.static_info = None;
557 self.partition_state.current_configuration = mssf_core::types::ReplicaSetConfig {
558 replicas: vec![],
559 write_quorum: 0,
560 };
561 self.check_partition_state();
562 Ok(())
563 }
564
565 pub async fn restart_secondary_graceful(&mut self, replica_id: i64) -> mssf_core::Result<()> {
567 {
569 self.partition_state
570 .replica_states
571 .get_mut(&replica_id)
572 .ok_or_else(|| {
573 mssf_core::Error::from(mssf_core::ErrorCode::FABRIC_E_REPLICA_DOES_NOT_EXIST)
574 })?;
575 if replica_id == self.partition_state.primary_index {
577 tracing::error!(
578 "Replica {} is primary, cannot restart as secondary",
579 replica_id
580 );
581 return Err(mssf_core::Error::from(
582 mssf_core::ErrorCode::FABRIC_E_INVALID_OPERATION,
583 ));
584 }
585 }
586
587 {
589 let primary = self.get_primary_state().unwrap();
590 let current_config = self.partition_state.current_configuration.clone();
591 let replica_count = current_config.replicas.len();
592 let new_replicas = current_config
593 .replicas
594 .iter()
595 .filter(|r| r.id != replica_id)
596 .cloned()
597 .collect::<Vec<_>>();
598 let write_quorum = (replica_count as u32) / 2 + 1; let new_config = mssf_core::types::ReplicaSetConfig {
600 replicas: new_replicas,
601 write_quorum,
602 };
603 primary
604 .replicator
605 .update_current_replica_set_configuration(new_config.clone())?;
606 self.partition_state.current_configuration = new_config;
607 }
608
609 let prev_state = self
610 .partition_state
611 .replica_states
612 .remove(&replica_id)
613 .unwrap();
614 let factory_index = prev_state.factory_index;
615 {
617 let cancellation_token = mssf_core::sync::SimpleCancelToken::new_boxed();
618 prev_state.replica.close(cancellation_token.clone()).await?;
619 prev_state.replicator.close(cancellation_token).await?;
620 drop(prev_state);
621 }
622
623 let factory = &*self.service_factory[factory_index as usize];
625 let replica = factory
626 .create_replica(
627 self.partition_state
628 .static_info
629 .as_ref()
630 .unwrap()
631 .service_type_name
632 .clone(),
633 self.partition_state
634 .static_info
635 .as_ref()
636 .unwrap()
637 .service_name
638 .clone(),
639 &self.partition_state.static_info.as_ref().unwrap().init_data,
640 self.partition_state
641 .static_info
642 .as_ref()
643 .unwrap()
644 .partition_id,
645 replica_id,
646 )
647 .inspect_err(|e| {
648 tracing::error!("Failed to create stateful service replica: {:?}", e)
649 })?;
650 let partition = StatefulServicePartitionMock::new(ServicePartitionInformation::Singleton(
652 mssf_core::types::SingletonPartitionInformation {
653 id: self
654 .partition_state
655 .static_info
656 .as_ref()
657 .unwrap()
658 .partition_id,
659 },
660 ));
661 let replctr = replica
663 .open(
664 mssf_core::types::OpenMode::Existing,
665 Arc::new(partition.clone()),
666 SimpleCancelToken::new_boxed(),
667 )
668 .await
669 .inspect_err(|e| tracing::error!("Fail to open replica {}", e))?;
670 let replctr_addr = replctr.open(SimpleCancelToken::new_boxed()).await?;
672 replctr
674 .change_role(
675 self.partition_state.epoch.clone(),
676 mssf_core::types::ReplicaRole::IdleSecondary,
677 SimpleCancelToken::new_boxed(),
678 )
679 .await?;
680 let replica_addr = replica
681 .change_role(
682 mssf_core::types::ReplicaRole::IdleSecondary,
683 SimpleCancelToken::new_boxed(),
684 )
685 .await?;
686
687 let primary = self.get_primary_state().unwrap();
689
690 let replica_info = mssf_core::types::ReplicaInformation {
691 replicator_address: replctr_addr.clone(),
692 id: replica_id,
693 role: mssf_core::types::ReplicaRole::IdleSecondary,
694 status: mssf_core::types::ReplicaStatus::Up,
695 current_progress: -1, catch_up_capability: -1,
697 must_catch_up: false,
698 };
699 primary
700 .replicator
701 .build_replica(replica_info.clone(), SimpleCancelToken::new_boxed())
702 .await?;
703
704 replica
706 .change_role(
707 mssf_core::types::ReplicaRole::ActiveSecondary,
708 SimpleCancelToken::new_boxed(),
709 )
710 .await?;
711 let mut updated_replica_info = replica_info.clone();
713 updated_replica_info.role = mssf_core::types::ReplicaRole::ActiveSecondary;
714 let prev_config = self.partition_state.current_configuration.clone();
716 let mut new_replicas = prev_config.replicas.clone();
717 new_replicas.push(updated_replica_info.clone());
718 let write_quorum = (new_replicas.len() as u32) / 2 + 1;
719 let new_config = mssf_core::types::ReplicaSetConfig {
720 replicas: new_replicas,
721 write_quorum,
722 };
723 primary
724 .replicator
725 .update_catch_up_replica_set_configuration(new_config.clone(), prev_config)?;
726 primary
728 .replicator
729 .wait_for_catch_up_quorum(
730 mssf_core::types::ReplicaSetQuorumMode::Write,
731 SimpleCancelToken::new_boxed(),
732 )
733 .await?;
734 primary
736 .replicator
737 .update_current_replica_set_configuration(new_config.clone())?;
738 self.partition_state.current_configuration = new_config;
739 let state = StatefulServiceReplicaState {
741 replica,
742 replicator: replctr,
743 _replica_address: replica_addr,
744 _replicator_address: replctr_addr,
745 partition,
746 factory_index,
747 };
748 let prev = self
749 .partition_state
750 .replica_states
751 .insert(replica_id, state);
752 assert!(prev.is_none(), "Service replica already exists");
753 self.check_partition_state();
755 Ok(())
756 }
757}