1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use tracing::{info, warn};
5
6use crate::error::{KubericError, Result};
7use crate::types::{
8 DataLossAction, Epoch, Lsn, OpenMode, ReplicaId, ReplicaInfo, ReplicaSetConfig,
9 ReplicaSetQuorumMode, ReplicaStatus, Role,
10};
11
12#[async_trait]
20pub trait ReplicaHandle: Send + Sync {
21 fn id(&self) -> ReplicaId;
22
23 async fn open(&self, mode: OpenMode) -> Result<()>;
25 async fn close(&self) -> Result<()>;
26 fn abort(&self);
27
28 async fn change_role(&self, epoch: Epoch, role: Role) -> Result<()>;
30 async fn update_epoch(&self, epoch: Epoch) -> Result<()>;
31
32 fn current_progress(&self) -> Lsn;
34 fn catch_up_capability(&self) -> Lsn;
35
36 async fn on_data_loss(&self) -> Result<DataLossAction>;
38 async fn update_catch_up_configuration(
39 &self,
40 current: ReplicaSetConfig,
41 previous: ReplicaSetConfig,
42 ) -> Result<()>;
43 async fn update_current_configuration(&self, current: ReplicaSetConfig) -> Result<()>;
44 async fn wait_for_catch_up_quorum(&self, mode: ReplicaSetQuorumMode) -> Result<()>;
45 async fn build_replica(&self, replica: ReplicaInfo) -> Result<()>;
46 async fn remove_replica(&self, replica_id: ReplicaId) -> Result<()>;
47
48 async fn revoke_write_status(&self) -> Result<()>;
52
53 fn replicator_address(&self) -> String;
55}
56
57pub struct PartitionDriver {
67 replicas: HashMap<ReplicaId, ReplicaState>,
68 primary_id: Option<ReplicaId>,
69 epoch: Epoch,
70 current_config: ReplicaSetConfig,
71}
72
73struct ReplicaState {
74 handle: Box<dyn ReplicaHandle>,
75 role: Role,
76}
77
78impl Default for PartitionDriver {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl PartitionDriver {
85 pub fn new() -> Self {
86 Self {
87 replicas: HashMap::new(),
88 primary_id: None,
89 epoch: Epoch::new(0, 0),
90 current_config: ReplicaSetConfig {
91 members: vec![],
92 write_quorum: 0,
93 },
94 }
95 }
96
97 fn next_epoch(&mut self) -> Epoch {
98 self.epoch.configuration_number += 1;
99 self.epoch
100 }
101
102 pub fn primary_id(&self) -> Option<ReplicaId> {
103 self.primary_id
104 }
105
106 pub fn epoch(&self) -> Epoch {
107 self.epoch
108 }
109
110 pub fn replica_ids(&self) -> Vec<ReplicaId> {
111 self.replicas.keys().cloned().collect()
112 }
113
114 pub fn handle(&self, id: ReplicaId) -> Option<&dyn ReplicaHandle> {
115 self.replicas.get(&id).map(|s| s.handle.as_ref())
116 }
117
118 pub fn remove_replica_from_driver(&mut self, id: ReplicaId) -> Option<Box<dyn ReplicaHandle>> {
122 self.replicas.remove(&id).map(|s| s.handle)
123 }
124
125 pub async fn create_partition(&mut self, handles: Vec<Box<dyn ReplicaHandle>>) -> Result<()> {
141 assert!(!handles.is_empty());
142 assert!(self.replicas.is_empty());
143
144 let epoch = self.next_epoch();
145
146 let ids: Vec<ReplicaId> = handles.iter().map(|h| h.id()).collect();
147 let primary_id = ids[0];
148 let secondary_ids: Vec<ReplicaId> = ids[1..].to_vec();
149
150 for handle in handles {
152 let id = handle.id();
153 self.replicas.insert(
154 id,
155 ReplicaState {
156 handle,
157 role: Role::None,
158 },
159 );
160 }
161
162 for &id in &ids {
164 self.replicas[&id].handle.open(OpenMode::New).await?;
165 }
166
167 self.replicas[&primary_id]
169 .handle
170 .change_role(epoch, Role::Primary)
171 .await?;
172 self.replicas.get_mut(&primary_id).unwrap().role = Role::Primary;
173 self.primary_id = Some(primary_id);
174
175 for &id in &secondary_ids {
177 let entry = &self.replicas[&id];
178 entry.handle.update_epoch(epoch).await?;
179 entry.handle.change_role(epoch, Role::IdleSecondary).await?;
180 self.replicas.get_mut(&id).unwrap().role = Role::IdleSecondary;
181 }
182
183 for &id in &secondary_ids {
185 let addr = self.replicas[&id].handle.replicator_address();
186 let replica_info = ReplicaInfo {
187 id,
188 role: Role::IdleSecondary,
189 status: ReplicaStatus::Up,
190 replicator_address: addr,
191 current_progress: -1,
192 catch_up_capability: -1,
193 must_catch_up: false,
194 };
195 self.replicas[&primary_id]
198 .handle
199 .build_replica(replica_info)
200 .await?;
201
202 self.replicas[&id]
204 .handle
205 .change_role(epoch, Role::ActiveSecondary)
206 .await?;
207 self.replicas.get_mut(&id).unwrap().role = Role::ActiveSecondary;
208 }
209
210 let mut config = ReplicaSetConfig {
212 members: vec![],
213 write_quorum: 1,
214 };
215 let mut ready_count: u32 = 1; for &id in &secondary_ids {
218 let prev_config = config.clone();
219 let addr = self.replicas[&id].handle.replicator_address();
220
221 config.members.push(ReplicaInfo {
222 id,
223 role: Role::ActiveSecondary,
224 status: ReplicaStatus::Up,
225 replicator_address: addr,
226 current_progress: 0,
227 catch_up_capability: 0,
228 must_catch_up: false,
229 });
230 ready_count += 1;
231 config.write_quorum = ready_count / 2 + 1;
232
233 self.replicas[&primary_id]
234 .handle
235 .update_catch_up_configuration(config.clone(), prev_config)
236 .await?;
237
238 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
240
241 self.replicas[&primary_id]
242 .handle
243 .wait_for_catch_up_quorum(ReplicaSetQuorumMode::Write)
244 .await?;
245
246 self.replicas[&primary_id]
247 .handle
248 .update_current_configuration(config.clone())
249 .await?;
250 }
251
252 self.current_config = config;
253
254 info!(
257 primary = primary_id,
258 secondaries = ?secondary_ids,
259 epoch = ?self.epoch,
260 write_quorum = self.current_config.write_quorum,
261 "partition created"
262 );
263
264 Ok(())
265 }
266
267 pub async fn delete_partition(&mut self) -> Result<()> {
273 if let Some(pid) = self.primary_id {
275 self.replicas[&pid]
276 .handle
277 .change_role(self.epoch, Role::ActiveSecondary)
278 .await?;
279 }
280
281 for entry in self.replicas.values() {
283 entry.handle.change_role(self.epoch, Role::None).await?;
284 }
285
286 for entry in self.replicas.values() {
288 entry.handle.close().await?;
289 }
290
291 self.replicas.clear();
292 self.primary_id = None;
293 self.current_config = ReplicaSetConfig {
294 members: vec![],
295 write_quorum: 0,
296 };
297
298 info!("partition deleted");
299 Ok(())
300 }
301
302 pub async fn failover(&mut self, failed_primary_id: ReplicaId) -> Result<()> {
316 assert_eq!(
317 Some(failed_primary_id),
318 self.primary_id,
319 "can only failover the current primary"
320 );
321
322 let new_epoch = self.next_epoch();
323 info!(failed = failed_primary_id, ?new_epoch, "starting failover");
324
325 self.replicas.remove(&failed_primary_id);
327 self.primary_id = None;
328
329 if self.replicas.is_empty() {
330 return Err(KubericError::Internal(
331 "no surviving replicas for failover".into(),
332 ));
333 }
334
335 let new_primary_id = self
337 .replicas
338 .values()
339 .max_by_key(|e| e.handle.current_progress())
340 .map(|e| e.handle.id())
341 .unwrap();
342
343 info!(
344 new_primary = new_primary_id,
345 lsn = self.replicas[&new_primary_id].handle.current_progress(),
346 "selected new primary"
347 );
348
349 self.replicas[&new_primary_id]
353 .handle
354 .change_role(new_epoch, Role::Primary)
355 .await?;
356 self.replicas.get_mut(&new_primary_id).unwrap().role = Role::Primary;
357 self.primary_id = Some(new_primary_id);
358
359 for (&id, entry) in &self.replicas {
364 if id != new_primary_id && entry.handle.update_epoch(new_epoch).await.is_err() {
365 warn!(
366 replica_id = id,
367 "failed to update epoch on secondary (will be rebuilt)"
368 );
369 }
370 }
371
372 let secondary_ids: Vec<ReplicaId> = self
374 .replicas
375 .keys()
376 .filter(|&&id| id != new_primary_id)
377 .cloned()
378 .collect();
379
380 let total_count = self.replicas.len() as u32;
381 let write_quorum = total_count / 2 + 1;
382
383 let members: Vec<ReplicaInfo> = secondary_ids
384 .iter()
385 .map(|&id| {
386 let entry = &self.replicas[&id];
387 ReplicaInfo {
388 id,
389 role: Role::ActiveSecondary,
390 status: ReplicaStatus::Up,
391 replicator_address: entry.handle.replicator_address(),
392 current_progress: entry.handle.current_progress(),
393 catch_up_capability: entry.handle.catch_up_capability(),
394 must_catch_up: false,
395 }
396 })
397 .collect();
398
399 let new_config = ReplicaSetConfig {
400 members,
401 write_quorum,
402 };
403
404 self.replicas[&new_primary_id]
406 .handle
407 .update_catch_up_configuration(new_config.clone(), self.current_config.clone())
408 .await?;
409
410 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
411
412 self.replicas[&new_primary_id]
413 .handle
414 .wait_for_catch_up_quorum(ReplicaSetQuorumMode::Write)
415 .await?;
416
417 self.replicas[&new_primary_id]
418 .handle
419 .update_current_configuration(new_config.clone())
420 .await?;
421
422 self.current_config = new_config;
423
424 info!(
425 new_primary = new_primary_id,
426 epoch = ?self.epoch,
427 "failover complete"
428 );
429
430 Ok(())
431 }
432
433 pub async fn switchover(&mut self, target_id: ReplicaId) -> Result<()> {
446 let old_primary_id = self.primary_id.ok_or(KubericError::NotPrimary)?;
447
448 assert_ne!(
449 old_primary_id, target_id,
450 "target must differ from current primary"
451 );
452 assert!(
453 self.replicas.contains_key(&target_id),
454 "target must be a known replica"
455 );
456
457 let new_epoch = self.next_epoch();
458 info!(
459 old_primary = old_primary_id,
460 new_primary = target_id,
461 ?new_epoch,
462 "starting switchover"
463 );
464
465 self.replicas[&old_primary_id]
468 .handle
469 .revoke_write_status()
470 .await?;
471
472 self.replicas[&old_primary_id]
474 .handle
475 .change_role(new_epoch, Role::ActiveSecondary)
476 .await?;
477 self.replicas.get_mut(&old_primary_id).unwrap().role = Role::ActiveSecondary;
478
479 let promote_result = tokio::time::timeout(
483 std::time::Duration::from_secs(5),
484 self.replicas[&target_id]
485 .handle
486 .change_role(new_epoch, Role::Primary),
487 )
488 .await;
489
490 let promote_err = match promote_result {
491 Ok(Ok(())) => None,
492 Ok(Err(e)) => Some(e),
493 Err(_) => Some(KubericError::Internal("promotion timed out".into())),
494 };
495
496 if let Some(e) = promote_err {
497 warn!(
498 target_id,
499 error = %e,
500 "target promotion failed, rolling back — re-promoting old primary"
501 );
502 self.replicas[&old_primary_id]
503 .handle
504 .change_role(new_epoch, Role::Primary)
505 .await?;
506 self.replicas.get_mut(&old_primary_id).unwrap().role = Role::Primary;
507 self.primary_id = Some(old_primary_id);
508 return Err(e);
509 }
510 self.replicas.get_mut(&target_id).unwrap().role = Role::Primary;
511 self.primary_id = Some(target_id);
512
513 for (&id, entry) in &self.replicas {
518 if id != old_primary_id
519 && id != target_id
520 && entry.handle.update_epoch(new_epoch).await.is_err()
521 {
522 warn!(
523 replica_id = id,
524 "failed to update epoch on secondary (will be rebuilt)"
525 );
526 }
527 }
528
529 let secondary_ids: Vec<ReplicaId> = self
531 .replicas
532 .keys()
533 .filter(|&&id| id != target_id)
534 .cloned()
535 .collect();
536
537 let total_count = self.replicas.len() as u32;
538 let write_quorum = total_count / 2 + 1;
539
540 let members: Vec<ReplicaInfo> = secondary_ids
541 .iter()
542 .map(|&id| {
543 let entry = &self.replicas[&id];
544 ReplicaInfo {
545 id,
546 role: Role::ActiveSecondary,
547 status: ReplicaStatus::Up,
548 replicator_address: entry.handle.replicator_address(),
549 current_progress: entry.handle.current_progress(),
550 catch_up_capability: entry.handle.catch_up_capability(),
551 must_catch_up: false,
552 }
553 })
554 .collect();
555
556 let new_config = ReplicaSetConfig {
557 members,
558 write_quorum,
559 };
560
561 self.replicas[&target_id]
562 .handle
563 .update_catch_up_configuration(new_config.clone(), self.current_config.clone())
564 .await?;
565
566 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
567
568 self.replicas[&target_id]
569 .handle
570 .wait_for_catch_up_quorum(ReplicaSetQuorumMode::Write)
571 .await?;
572
573 self.replicas[&target_id]
574 .handle
575 .update_current_configuration(new_config.clone())
576 .await?;
577
578 self.current_config = new_config;
579
580 info!(
581 new_primary = target_id,
582 epoch = ?self.epoch,
583 "switchover complete"
584 );
585
586 Ok(())
587 }
588
589 pub async fn remove_secondary(
601 &mut self,
602 secondary_id: ReplicaId,
603 min_replicas: usize,
604 ) -> Result<()> {
605 let primary_id = self.primary_id.ok_or(KubericError::NotPrimary)?;
606 assert_ne!(
607 secondary_id, primary_id,
608 "cannot remove the primary — use switchover first"
609 );
610 assert!(
611 self.replicas.contains_key(&secondary_id),
612 "replica {} not found",
613 secondary_id
614 );
615 assert!(
616 self.replicas.len() > min_replicas,
617 "cannot scale below min_replicas ({})",
618 min_replicas
619 );
620
621 info!(secondary_id, "removing secondary (scale-down)");
622
623 let secondary_ids: Vec<ReplicaId> = self
625 .replicas
626 .keys()
627 .filter(|&&id| id != primary_id && id != secondary_id)
628 .cloned()
629 .collect();
630
631 let total_count = (self.replicas.len() - 1) as u32; let write_quorum = total_count / 2 + 1;
633
634 let members: Vec<ReplicaInfo> = secondary_ids
635 .iter()
636 .map(|&id| {
637 let entry = &self.replicas[&id];
638 ReplicaInfo {
639 id,
640 role: Role::ActiveSecondary,
641 status: ReplicaStatus::Up,
642 replicator_address: entry.handle.replicator_address(),
643 current_progress: entry.handle.current_progress(),
644 catch_up_capability: entry.handle.catch_up_capability(),
645 must_catch_up: false,
646 }
647 })
648 .collect();
649
650 let new_config = ReplicaSetConfig {
651 members,
652 write_quorum,
653 };
654
655 self.replicas[&primary_id]
656 .handle
657 .update_catch_up_configuration(new_config.clone(), self.current_config.clone())
658 .await?;
659
660 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
661
662 self.replicas[&primary_id]
663 .handle
664 .wait_for_catch_up_quorum(ReplicaSetQuorumMode::Write)
665 .await?;
666
667 self.replicas[&primary_id]
668 .handle
669 .update_current_configuration(new_config.clone())
670 .await?;
671
672 self.current_config = new_config;
673
674 let removed = self.replicas.remove(&secondary_id).unwrap();
676 let _ = removed.handle.change_role(self.epoch, Role::None).await;
677 let _ = removed.handle.close().await;
678
679 info!(
680 secondary_id,
681 remaining = self.replicas.len(),
682 "secondary removed"
683 );
684 Ok(())
685 }
686
687 pub async fn add_replica(&mut self, handle: Box<dyn ReplicaHandle>) -> Result<()> {
704 let primary_id = self.primary_id.ok_or(KubericError::NotPrimary)?;
705 let replica_id = handle.id();
706
707 assert_ne!(
708 replica_id, primary_id,
709 "cannot add the primary as a secondary"
710 );
711 assert!(
712 !self.replicas.contains_key(&replica_id),
713 "replica {} already exists — use restart_secondary to replace",
714 replica_id
715 );
716
717 let epoch = self.epoch;
718 info!(replica_id, ?epoch, "adding replica");
719
720 self.replicas.insert(
722 replica_id,
723 ReplicaState {
724 handle,
725 role: Role::None,
726 },
727 );
728
729 let h = &self.replicas[&replica_id].handle;
731 h.open(OpenMode::New).await?;
732 h.update_epoch(epoch).await?;
733 h.change_role(epoch, Role::IdleSecondary).await?;
734 self.replicas.get_mut(&replica_id).unwrap().role = Role::IdleSecondary;
735
736 let addr = self.replicas[&replica_id].handle.replicator_address();
738 let replica_info = ReplicaInfo {
739 id: replica_id,
740 role: Role::IdleSecondary,
741 status: ReplicaStatus::Up,
742 replicator_address: addr,
743 current_progress: -1,
744 catch_up_capability: -1,
745 must_catch_up: false,
746 };
747 self.replicas[&primary_id]
748 .handle
749 .build_replica(replica_info)
750 .await?;
751
752 self.replicas[&replica_id]
754 .handle
755 .change_role(epoch, Role::ActiveSecondary)
756 .await?;
757 self.replicas.get_mut(&replica_id).unwrap().role = Role::ActiveSecondary;
758
759 self.reconfigure_quorum(primary_id, Some(replica_id))
761 .await?;
762
763 info!(replica_id, "replica added");
764 Ok(())
765 }
766
767 pub async fn restart_secondary(
775 &mut self,
776 secondary_id: ReplicaId,
777 new_handle: Box<dyn ReplicaHandle>,
778 ) -> Result<()> {
779 let primary_id = self.primary_id.ok_or(KubericError::NotPrimary)?;
780 assert_ne!(
781 secondary_id, primary_id,
782 "cannot restart the primary with restart_secondary"
783 );
784 assert!(
785 self.replicas.contains_key(&secondary_id),
786 "replica {} not found — use add_replica for new replicas",
787 secondary_id
788 );
789
790 info!(secondary_id, "restarting secondary");
791
792 if let Some(old) = self.replicas.get(&secondary_id) {
794 let _ = old.handle.close().await;
795 }
796
797 self.replicas.remove(&secondary_id);
799
800 assert_eq!(new_handle.id(), secondary_id);
802 self.add_replica(new_handle).await
803 }
804
805 async fn reconfigure_quorum(
810 &mut self,
811 primary_id: ReplicaId,
812 must_catch_up_id: Option<ReplicaId>,
813 ) -> Result<()> {
814 let secondary_ids: Vec<ReplicaId> = self
815 .replicas
816 .keys()
817 .filter(|&&id| id != primary_id)
818 .cloned()
819 .collect();
820
821 let total_count = self.replicas.len() as u32;
822 let write_quorum = total_count / 2 + 1;
823
824 let members: Vec<ReplicaInfo> = secondary_ids
825 .iter()
826 .map(|&id| {
827 let entry = &self.replicas[&id];
828 ReplicaInfo {
829 id,
830 role: Role::ActiveSecondary,
831 status: ReplicaStatus::Up,
832 replicator_address: entry.handle.replicator_address(),
833 current_progress: entry.handle.current_progress(),
834 catch_up_capability: entry.handle.catch_up_capability(),
835 must_catch_up: must_catch_up_id == Some(id),
836 }
837 })
838 .collect();
839
840 let new_config = ReplicaSetConfig {
841 members,
842 write_quorum,
843 };
844
845 self.replicas[&primary_id]
846 .handle
847 .update_catch_up_configuration(new_config.clone(), self.current_config.clone())
848 .await?;
849
850 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
851
852 self.replicas[&primary_id]
853 .handle
854 .wait_for_catch_up_quorum(ReplicaSetQuorumMode::Write)
855 .await?;
856
857 self.replicas[&primary_id]
858 .handle
859 .update_current_configuration(new_config.clone())
860 .await?;
861
862 self.current_config = new_config;
863 Ok(())
864 }
865}
866
867#[cfg(any(test, feature = "testing"))]
872pub mod testing {
873 use super::*;
874 use std::sync::Arc;
875
876 use tokio::sync::{mpsc, oneshot};
877 use tonic::transport::Server;
878
879 use crate::events::{ReplicateRequest, ReplicatorControlEvent};
880 use crate::handles::{PartitionState, StateReplicatorHandle};
881 use crate::proto::replicator_data_server::ReplicatorDataServer;
882 use crate::replicator::actor::WalReplicatorActor;
883 use crate::replicator::secondary::{SecondaryReceiver, SecondaryState};
884 use crate::types::{AccessStatus, CancellationToken};
885
886 pub struct InProcessReplicaHandle {
889 id: ReplicaId,
890 control_tx: mpsc::Sender<ReplicatorControlEvent>,
891 data_tx: mpsc::Sender<ReplicateRequest>,
892 state: Arc<PartitionState>,
893 pub secondary_state: Arc<SecondaryState>,
894 grpc_address: String,
895 shutdown_token: CancellationToken,
896 _actor_handle: tokio::task::JoinHandle<()>,
897 _grpc_handle: tokio::task::JoinHandle<()>,
898 }
899
900 impl InProcessReplicaHandle {
901 pub async fn spawn(id: ReplicaId) -> Result<Self> {
903 let (control_tx, control_rx) = mpsc::channel(16);
904 let (data_tx, data_rx) = mpsc::channel::<ReplicateRequest>(256);
905 let state = Arc::new(PartitionState::new());
906 let secondary_state = Arc::new(SecondaryState::new());
907 let shutdown_token = CancellationToken::new();
908
909 let receiver = SecondaryReceiver::new(secondary_state.clone());
911 let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
912 .await
913 .map_err(|e| KubericError::Internal(Box::new(e)))?;
914 let addr = listener.local_addr().unwrap();
915 let grpc_address = format!("http://{}", addr);
916
917 let grpc_shutdown = shutdown_token.child_token();
918 let grpc_handle = tokio::spawn(async move {
919 let _ = Server::builder()
920 .add_service(ReplicatorDataServer::new(receiver))
921 .serve_with_incoming_shutdown(
922 tokio_stream::wrappers::TcpListenerStream::new(listener),
923 grpc_shutdown.cancelled(),
924 )
925 .await;
926 });
927
928 let actor = WalReplicatorActor::new(id);
930 let state_cp = state.clone();
931 let (sp_tx, _sp_rx) = mpsc::unbounded_channel();
933 let actor_handle = tokio::spawn(async move {
934 actor.run(control_rx, data_rx, state_cp, sp_tx).await;
935 });
936
937 Ok(Self {
938 id,
939 control_tx,
940 data_tx,
941 state,
942 secondary_state,
943 grpc_address,
944 shutdown_token,
945 _actor_handle: actor_handle,
946 _grpc_handle: grpc_handle,
947 })
948 }
949
950 async fn send_control(
951 &self,
952 make: impl FnOnce(oneshot::Sender<Result<()>>) -> ReplicatorControlEvent,
953 ) -> Result<()> {
954 let (tx, rx) = oneshot::channel();
955 self.control_tx
956 .send(make(tx))
957 .await
958 .map_err(|_| KubericError::Closed)?;
959 rx.await.map_err(|_| KubericError::Closed)?
960 }
961
962 pub fn state_replicator(&self) -> StateReplicatorHandle {
964 StateReplicatorHandle::new(self.data_tx.clone(), self.state.clone())
965 }
966 }
967
968 #[async_trait]
969 impl ReplicaHandle for InProcessReplicaHandle {
970 fn id(&self) -> ReplicaId {
971 self.id
972 }
973
974 async fn open(&self, mode: OpenMode) -> Result<()> {
975 self.send_control(|reply| ReplicatorControlEvent::Open { mode, reply })
976 .await
977 }
978
979 async fn close(&self) -> Result<()> {
980 let result = self
981 .send_control(|reply| ReplicatorControlEvent::Close { reply })
982 .await;
983 self.shutdown_token.cancel();
984 result
985 }
986
987 fn abort(&self) {
988 let _ = self.control_tx.try_send(ReplicatorControlEvent::Abort);
989 self.shutdown_token.cancel();
990 }
991
992 async fn change_role(&self, epoch: Epoch, role: Role) -> Result<()> {
993 self.secondary_state.update_epoch(epoch);
994 self.send_control(|reply| ReplicatorControlEvent::ChangeRole { epoch, role, reply })
995 .await?;
996 match role {
998 Role::Primary => {
999 self.state.set_read_status(AccessStatus::Granted);
1000 self.state.set_write_status(AccessStatus::Granted);
1001 }
1002 _ => {
1003 self.state.set_read_status(AccessStatus::NotPrimary);
1004 self.state.set_write_status(AccessStatus::NotPrimary);
1005 }
1006 }
1007 Ok(())
1008 }
1009
1010 async fn update_epoch(&self, epoch: Epoch) -> Result<()> {
1011 self.secondary_state.update_epoch(epoch);
1012 self.send_control(|reply| ReplicatorControlEvent::UpdateEpoch { epoch, reply })
1013 .await
1014 }
1015
1016 fn current_progress(&self) -> Lsn {
1017 self.state.current_progress()
1018 }
1019
1020 fn catch_up_capability(&self) -> Lsn {
1021 self.state.catch_up_capability()
1022 }
1023
1024 async fn on_data_loss(&self) -> Result<DataLossAction> {
1025 let (tx, rx) = oneshot::channel();
1026 self.control_tx
1027 .send(ReplicatorControlEvent::OnDataLoss { reply: tx })
1028 .await
1029 .map_err(|_| KubericError::Closed)?;
1030 rx.await.map_err(|_| KubericError::Closed)?
1031 }
1032
1033 async fn update_catch_up_configuration(
1034 &self,
1035 current: ReplicaSetConfig,
1036 previous: ReplicaSetConfig,
1037 ) -> Result<()> {
1038 let (tx, rx) = oneshot::channel();
1039 self.control_tx
1040 .send(ReplicatorControlEvent::UpdateCatchUpConfiguration {
1041 current,
1042 previous,
1043 reply: tx,
1044 })
1045 .await
1046 .map_err(|_| KubericError::Closed)?;
1047 rx.await.map_err(|_| KubericError::Closed)?
1048 }
1049
1050 async fn update_current_configuration(&self, current: ReplicaSetConfig) -> Result<()> {
1051 let (tx, rx) = oneshot::channel();
1052 self.control_tx
1053 .send(ReplicatorControlEvent::UpdateCurrentConfiguration { current, reply: tx })
1054 .await
1055 .map_err(|_| KubericError::Closed)?;
1056 rx.await.map_err(|_| KubericError::Closed)?
1057 }
1058
1059 async fn wait_for_catch_up_quorum(&self, mode: ReplicaSetQuorumMode) -> Result<()> {
1060 self.send_control(|reply| ReplicatorControlEvent::WaitForCatchUpQuorum { mode, reply })
1061 .await
1062 }
1063
1064 async fn build_replica(&self, replica: ReplicaInfo) -> Result<()> {
1065 self.send_control(|reply| ReplicatorControlEvent::BuildReplica { replica, reply })
1066 .await
1067 }
1068
1069 async fn remove_replica(&self, replica_id: ReplicaId) -> Result<()> {
1070 self.send_control(|reply| ReplicatorControlEvent::RemoveReplica { replica_id, reply })
1071 .await
1072 }
1073
1074 async fn revoke_write_status(&self) -> Result<()> {
1075 self.state
1076 .set_write_status(AccessStatus::ReconfigurationPending);
1077 Ok(())
1078 }
1079
1080 fn replicator_address(&self) -> String {
1081 self.grpc_address.clone()
1082 }
1083 }
1084
1085 pub async fn spawn_replicas(count: usize) -> Result<Vec<Box<dyn ReplicaHandle>>> {
1087 let mut handles: Vec<Box<dyn ReplicaHandle>> = Vec::new();
1088 for i in 1..=(count as ReplicaId) {
1089 handles.push(Box::new(InProcessReplicaHandle::spawn(i).await?));
1090 }
1091 Ok(handles)
1092 }
1093}