Skip to main content

kuberic_core/
driver.rs

1use std::collections::HashMap;
2
3use async_trait::async_trait;
4use tracing::{info, warn};
5
6use crate::error::{KubericError, Result};
7use crate::types::{
8    DataLossAction, Epoch, Lsn, OpenMode, ReplicaId, ReplicaInfo, ReplicaSetConfig,
9    ReplicaSetQuorumMode, ReplicaStatus, Role,
10};
11
12// ---------------------------------------------------------------------------
13// ReplicaHandle trait — abstraction over how we talk to a replica
14// ---------------------------------------------------------------------------
15
16/// Handle for communicating with a single replica's replicator.
17/// Tests implement this via in-process channels; the operator implements it
18/// via gRPC to a remote pod.
19#[async_trait]
20pub trait ReplicaHandle: Send + Sync {
21    fn id(&self) -> ReplicaId;
22
23    // Lifecycle
24    async fn open(&self, mode: OpenMode) -> Result<()>;
25    async fn close(&self) -> Result<()>;
26    fn abort(&self);
27
28    // Role management
29    async fn change_role(&self, epoch: Epoch, role: Role) -> Result<()>;
30    async fn update_epoch(&self, epoch: Epoch) -> Result<()>;
31
32    // Progress (for primary selection)
33    fn current_progress(&self) -> Lsn;
34    fn catch_up_capability(&self) -> Lsn;
35
36    // Primary-only reconfiguration
37    async fn on_data_loss(&self) -> Result<DataLossAction>;
38    async fn update_catch_up_configuration(
39        &self,
40        current: ReplicaSetConfig,
41        previous: ReplicaSetConfig,
42    ) -> Result<()>;
43    async fn update_current_configuration(&self, current: ReplicaSetConfig) -> Result<()>;
44    async fn wait_for_catch_up_quorum(&self, mode: ReplicaSetQuorumMode) -> Result<()>;
45    async fn build_replica(&self, replica: ReplicaInfo) -> Result<()>;
46    async fn remove_replica(&self, replica_id: ReplicaId) -> Result<()>;
47
48    /// Revoke write status before switchover demotion.
49    /// Sets write_status = ReconfigurationPending so new writes are
50    /// immediately rejected. In-flight writes continue to completion.
51    async fn revoke_write_status(&self) -> Result<()>;
52
53    /// The gRPC address where this replica's replication server listens.
54    fn replicator_address(&self) -> String;
55}
56
57// ---------------------------------------------------------------------------
58// PartitionDriver — pure workflow orchestrator
59// ---------------------------------------------------------------------------
60
61/// Workflow driver that encodes the correct SF-style lifecycle sequences
62/// for a partition. Operates on `ReplicaHandle` trait objects — agnostic
63/// to whether replicas are in-process or remote.
64///
65/// Mirrors `StatefulServicePartitionDriver` from service-fabric-rs.
66pub struct PartitionDriver {
67    replicas: HashMap<ReplicaId, ReplicaState>,
68    primary_id: Option<ReplicaId>,
69    epoch: Epoch,
70    current_config: ReplicaSetConfig,
71}
72
73struct ReplicaState {
74    handle: Box<dyn ReplicaHandle>,
75    role: Role,
76}
77
78impl Default for PartitionDriver {
79    fn default() -> Self {
80        Self::new()
81    }
82}
83
84impl PartitionDriver {
85    pub fn new() -> Self {
86        Self {
87            replicas: HashMap::new(),
88            primary_id: None,
89            epoch: Epoch::new(0, 0),
90            current_config: ReplicaSetConfig {
91                members: vec![],
92                write_quorum: 0,
93            },
94        }
95    }
96
97    fn next_epoch(&mut self) -> Epoch {
98        self.epoch.configuration_number += 1;
99        self.epoch
100    }
101
102    pub fn primary_id(&self) -> Option<ReplicaId> {
103        self.primary_id
104    }
105
106    pub fn epoch(&self) -> Epoch {
107        self.epoch
108    }
109
110    pub fn replica_ids(&self) -> Vec<ReplicaId> {
111        self.replicas.keys().cloned().collect()
112    }
113
114    pub fn handle(&self, id: ReplicaId) -> Option<&dyn ReplicaHandle> {
115        self.replicas.get(&id).map(|s| s.handle.as_ref())
116    }
117
118    /// Remove a replica from the driver's tracking without notifying
119    /// the primary's replicator. Used when the reconciler detects a pod
120    /// is permanently dead before failover. Returns the handle for cleanup.
121    pub fn remove_replica_from_driver(&mut self, id: ReplicaId) -> Option<Box<dyn ReplicaHandle>> {
122        self.replicas.remove(&id).map(|s| s.handle)
123    }
124
125    // -----------------------------------------------------------------------
126    // Workflow: Create Partition
127    // -----------------------------------------------------------------------
128
129    /// Create a partition from pre-created replica handles.
130    /// The first handle becomes primary; the rest become secondaries.
131    ///
132    /// Follows the exact SF workflow:
133    /// 1. Open all replicators
134    /// 2. Assign primary role (replicator first)
135    /// 3. Assign idle role to secondaries
136    /// 4. build_replica for each secondary
137    /// 5. Promote each secondary to active
138    /// 6. Update configuration incrementally
139    /// 7. Set access status
140    pub async fn create_partition(&mut self, handles: Vec<Box<dyn ReplicaHandle>>) -> Result<()> {
141        assert!(!handles.is_empty());
142        assert!(self.replicas.is_empty());
143
144        let epoch = self.next_epoch();
145
146        let ids: Vec<ReplicaId> = handles.iter().map(|h| h.id()).collect();
147        let primary_id = ids[0];
148        let secondary_ids: Vec<ReplicaId> = ids[1..].to_vec();
149
150        // Store handles
151        for handle in handles {
152            let id = handle.id();
153            self.replicas.insert(
154                id,
155                ReplicaState {
156                    handle,
157                    role: Role::None,
158                },
159            );
160        }
161
162        // 1. Open all replicators
163        for &id in &ids {
164            self.replicas[&id].handle.open(OpenMode::New).await?;
165        }
166
167        // 2. Assign roles to replicators (replicator BEFORE status set)
168        self.replicas[&primary_id]
169            .handle
170            .change_role(epoch, Role::Primary)
171            .await?;
172        self.replicas.get_mut(&primary_id).unwrap().role = Role::Primary;
173        self.primary_id = Some(primary_id);
174
175        // 3. Secondaries → Idle
176        for &id in &secondary_ids {
177            let entry = &self.replicas[&id];
178            entry.handle.update_epoch(epoch).await?;
179            entry.handle.change_role(epoch, Role::IdleSecondary).await?;
180            self.replicas.get_mut(&id).unwrap().role = Role::IdleSecondary;
181        }
182
183        // 4. Build each secondary via primary, then promote
184        for &id in &secondary_ids {
185            let addr = self.replicas[&id].handle.replicator_address();
186            let replica_info = ReplicaInfo {
187                id,
188                role: Role::IdleSecondary,
189                status: ReplicaStatus::Up,
190                replicator_address: addr,
191                current_progress: -1,
192                catch_up_capability: -1,
193                must_catch_up: false,
194            };
195            // Primary handles the full copy protocol internally
196            // (connects to secondary's data plane, runs GetCopyContext + CopyStream)
197            self.replicas[&primary_id]
198                .handle
199                .build_replica(replica_info)
200                .await?;
201
202            // 5. Promote idle → active
203            self.replicas[&id]
204                .handle
205                .change_role(epoch, Role::ActiveSecondary)
206                .await?;
207            self.replicas.get_mut(&id).unwrap().role = Role::ActiveSecondary;
208        }
209
210        // 6. Update configuration incrementally
211        let mut config = ReplicaSetConfig {
212            members: vec![],
213            write_quorum: 1,
214        };
215        let mut ready_count: u32 = 1; // Primary
216
217        for &id in &secondary_ids {
218            let prev_config = config.clone();
219            let addr = self.replicas[&id].handle.replicator_address();
220
221            config.members.push(ReplicaInfo {
222                id,
223                role: Role::ActiveSecondary,
224                status: ReplicaStatus::Up,
225                replicator_address: addr,
226                current_progress: 0,
227                catch_up_capability: 0,
228                must_catch_up: false,
229            });
230            ready_count += 1;
231            config.write_quorum = ready_count / 2 + 1;
232
233            self.replicas[&primary_id]
234                .handle
235                .update_catch_up_configuration(config.clone(), prev_config)
236                .await?;
237
238            // Give gRPC connections time to establish (in-process only)
239            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
240
241            self.replicas[&primary_id]
242                .handle
243                .wait_for_catch_up_quorum(ReplicaSetQuorumMode::Write)
244                .await?;
245
246            self.replicas[&primary_id]
247                .handle
248                .update_current_configuration(config.clone())
249                .await?;
250        }
251
252        self.current_config = config;
253
254        // Access status is set by each pod's PodRuntime during change_role()
255
256        info!(
257            primary = primary_id,
258            secondaries = ?secondary_ids,
259            epoch = ?self.epoch,
260            write_quorum = self.current_config.write_quorum,
261            "partition created"
262        );
263
264        Ok(())
265    }
266
267    // -----------------------------------------------------------------------
268    // Workflow: Delete Partition
269    // -----------------------------------------------------------------------
270
271    /// Gracefully shut down all replicas.
272    pub async fn delete_partition(&mut self) -> Result<()> {
273        // 1. Demote primary
274        if let Some(pid) = self.primary_id {
275            self.replicas[&pid]
276                .handle
277                .change_role(self.epoch, Role::ActiveSecondary)
278                .await?;
279        }
280
281        // 2. Change all to None
282        for entry in self.replicas.values() {
283            entry.handle.change_role(self.epoch, Role::None).await?;
284        }
285
286        // 3. Close all
287        for entry in self.replicas.values() {
288            entry.handle.close().await?;
289        }
290
291        self.replicas.clear();
292        self.primary_id = None;
293        self.current_config = ReplicaSetConfig {
294            members: vec![],
295            write_quorum: 0,
296        };
297
298        info!("partition deleted");
299        Ok(())
300    }
301
302    // -----------------------------------------------------------------------
303    // Workflow: Failover (unplanned primary failure)
304    // -----------------------------------------------------------------------
305
306    /// Failover after the primary has failed. The failed primary's handle
307    /// may be unreachable — the driver does not call it.
308    ///
309    /// Matches SF's reconfiguration phases:
310    /// 1. Remove failed primary, increment epoch
311    /// 2. Select new primary by highest current_progress (Phase 1: GetLSN)
312    /// 3. Promote new primary with new epoch (Phase 4: Activate)
313    /// 4. Reconfigure quorum — epoch distributed to secondaries as part
314    ///    of the new configuration (best-effort, skip unreachable)
315    pub async fn failover(&mut self, failed_primary_id: ReplicaId) -> Result<()> {
316        assert_eq!(
317            Some(failed_primary_id),
318            self.primary_id,
319            "can only failover the current primary"
320        );
321
322        let new_epoch = self.next_epoch();
323        info!(failed = failed_primary_id, ?new_epoch, "starting failover");
324
325        // Remove the failed primary from our tracking
326        self.replicas.remove(&failed_primary_id);
327        self.primary_id = None;
328
329        if self.replicas.is_empty() {
330            return Err(KubericError::Internal(
331                "no surviving replicas for failover".into(),
332            ));
333        }
334
335        // 1. Select new primary by highest current_progress (LSN)
336        let new_primary_id = self
337            .replicas
338            .values()
339            .max_by_key(|e| e.handle.current_progress())
340            .map(|e| e.handle.id())
341            .unwrap();
342
343        info!(
344            new_primary = new_primary_id,
345            lsn = self.replicas[&new_primary_id].handle.current_progress(),
346            "selected new primary"
347        );
348
349        // 2. Promote new primary (SF Phase 4: Activate)
350        // The new epoch is delivered with the promotion — no separate
351        // fencing step needed. The old primary is dead and can't send ops.
352        self.replicas[&new_primary_id]
353            .handle
354            .change_role(new_epoch, Role::Primary)
355            .await?;
356        self.replicas.get_mut(&new_primary_id).unwrap().role = Role::Primary;
357        self.primary_id = Some(new_primary_id);
358
359        // 3. Distribute epoch to surviving secondaries (best-effort).
360        // Unreachable secondaries are skipped — they'll be rebuilt later.
361        // This prevents a zombie primary (if it recovers) from sending
362        // ops to secondaries that still accept the old epoch.
363        for (&id, entry) in &self.replicas {
364            if id != new_primary_id && entry.handle.update_epoch(new_epoch).await.is_err() {
365                warn!(
366                    replica_id = id,
367                    "failed to update epoch on secondary (will be rebuilt)"
368                );
369            }
370        }
371
372        // 4. Rebuild configuration (all surviving non-primary replicas)
373        let secondary_ids: Vec<ReplicaId> = self
374            .replicas
375            .keys()
376            .filter(|&&id| id != new_primary_id)
377            .cloned()
378            .collect();
379
380        let total_count = self.replicas.len() as u32;
381        let write_quorum = total_count / 2 + 1;
382
383        let members: Vec<ReplicaInfo> = secondary_ids
384            .iter()
385            .map(|&id| {
386                let entry = &self.replicas[&id];
387                ReplicaInfo {
388                    id,
389                    role: Role::ActiveSecondary,
390                    status: ReplicaStatus::Up,
391                    replicator_address: entry.handle.replicator_address(),
392                    current_progress: entry.handle.current_progress(),
393                    catch_up_capability: entry.handle.catch_up_capability(),
394                    must_catch_up: false,
395                }
396            })
397            .collect();
398
399        let new_config = ReplicaSetConfig {
400            members,
401            write_quorum,
402        };
403
404        // Update configuration on new primary
405        self.replicas[&new_primary_id]
406            .handle
407            .update_catch_up_configuration(new_config.clone(), self.current_config.clone())
408            .await?;
409
410        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
411
412        self.replicas[&new_primary_id]
413            .handle
414            .wait_for_catch_up_quorum(ReplicaSetQuorumMode::Write)
415            .await?;
416
417        self.replicas[&new_primary_id]
418            .handle
419            .update_current_configuration(new_config.clone())
420            .await?;
421
422        self.current_config = new_config;
423
424        info!(
425            new_primary = new_primary_id,
426            epoch = ?self.epoch,
427            "failover complete"
428        );
429
430        Ok(())
431    }
432
433    // -----------------------------------------------------------------------
434    // Workflow: Switchover (planned primary change)
435    // -----------------------------------------------------------------------
436
437    /// Graceful primary change to a specific target secondary.
438    ///
439    /// Matches SF's SwapPrimary reconfiguration:
440    /// 1. Revoke write status on old primary (SF Phase 0: Demote)
441    /// 2. Demote old primary → ActiveSecondary
442    /// 3. Promote target → Primary (SF Phase 4: Activate)
443    /// 4. Distribute epoch to other secondaries (best-effort)
444    /// 5. Reconfigure quorum + catchup
445    pub async fn switchover(&mut self, target_id: ReplicaId) -> Result<()> {
446        let old_primary_id = self.primary_id.ok_or(KubericError::NotPrimary)?;
447
448        assert_ne!(
449            old_primary_id, target_id,
450            "target must differ from current primary"
451        );
452        assert!(
453            self.replicas.contains_key(&target_id),
454            "target must be a known replica"
455        );
456
457        let new_epoch = self.next_epoch();
458        info!(
459            old_primary = old_primary_id,
460            new_primary = target_id,
461            ?new_epoch,
462            "starting switchover"
463        );
464
465        // 1. Revoke write status on old primary (SF Phase 0: Demote)
466        // New writes are immediately rejected; in-flight writes continue.
467        self.replicas[&old_primary_id]
468            .handle
469            .revoke_write_status()
470            .await?;
471
472        // 2. Demote old primary → ActiveSecondary
473        self.replicas[&old_primary_id]
474            .handle
475            .change_role(new_epoch, Role::ActiveSecondary)
476            .await?;
477        self.replicas.get_mut(&old_primary_id).unwrap().role = Role::ActiveSecondary;
478
479        // 3. Promote target → Primary (SF Phase 4: Activate)
480        // If this fails or times out, rollback: re-promote old primary
481        // (SF AbortPhase0Demote + RevertConfiguration pattern).
482        let promote_result = tokio::time::timeout(
483            std::time::Duration::from_secs(5),
484            self.replicas[&target_id]
485                .handle
486                .change_role(new_epoch, Role::Primary),
487        )
488        .await;
489
490        let promote_err = match promote_result {
491            Ok(Ok(())) => None,
492            Ok(Err(e)) => Some(e),
493            Err(_) => Some(KubericError::Internal("promotion timed out".into())),
494        };
495
496        if let Some(e) = promote_err {
497            warn!(
498                target_id,
499                error = %e,
500                "target promotion failed, rolling back — re-promoting old primary"
501            );
502            self.replicas[&old_primary_id]
503                .handle
504                .change_role(new_epoch, Role::Primary)
505                .await?;
506            self.replicas.get_mut(&old_primary_id).unwrap().role = Role::Primary;
507            self.primary_id = Some(old_primary_id);
508            return Err(e);
509        }
510        self.replicas.get_mut(&target_id).unwrap().role = Role::Primary;
511        self.primary_id = Some(target_id);
512
513        // 4. Distribute epoch to other secondaries (best-effort).
514        // Unreachable secondaries are skipped — they'll be rebuilt later.
515        // The old primary already has the epoch from step 2 (change_role).
516        // The target already has it from step 3.
517        for (&id, entry) in &self.replicas {
518            if id != old_primary_id
519                && id != target_id
520                && entry.handle.update_epoch(new_epoch).await.is_err()
521            {
522                warn!(
523                    replica_id = id,
524                    "failed to update epoch on secondary (will be rebuilt)"
525                );
526            }
527        }
528
529        // 5. Rebuild configuration
530        let secondary_ids: Vec<ReplicaId> = self
531            .replicas
532            .keys()
533            .filter(|&&id| id != target_id)
534            .cloned()
535            .collect();
536
537        let total_count = self.replicas.len() as u32;
538        let write_quorum = total_count / 2 + 1;
539
540        let members: Vec<ReplicaInfo> = secondary_ids
541            .iter()
542            .map(|&id| {
543                let entry = &self.replicas[&id];
544                ReplicaInfo {
545                    id,
546                    role: Role::ActiveSecondary,
547                    status: ReplicaStatus::Up,
548                    replicator_address: entry.handle.replicator_address(),
549                    current_progress: entry.handle.current_progress(),
550                    catch_up_capability: entry.handle.catch_up_capability(),
551                    must_catch_up: false,
552                }
553            })
554            .collect();
555
556        let new_config = ReplicaSetConfig {
557            members,
558            write_quorum,
559        };
560
561        self.replicas[&target_id]
562            .handle
563            .update_catch_up_configuration(new_config.clone(), self.current_config.clone())
564            .await?;
565
566        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
567
568        self.replicas[&target_id]
569            .handle
570            .wait_for_catch_up_quorum(ReplicaSetQuorumMode::Write)
571            .await?;
572
573        self.replicas[&target_id]
574            .handle
575            .update_current_configuration(new_config.clone())
576            .await?;
577
578        self.current_config = new_config;
579
580        info!(
581            new_primary = target_id,
582            epoch = ?self.epoch,
583            "switchover complete"
584        );
585
586        Ok(())
587    }
588
589    // -----------------------------------------------------------------------
590    // Workflow: Remove Secondary (scale-down)
591    // -----------------------------------------------------------------------
592
593    /// Remove a secondary from the partition. Config-first: the configuration
594    /// is updated before the replica is closed, maintaining write quorum.
595    ///
596    /// 1. Verify not removing primary, and above min count
597    /// 2. Reconfigure without the target replica
598    /// 3. Change role to None + close the removed replica
599    /// 4. Remove from driver
600    pub async fn remove_secondary(
601        &mut self,
602        secondary_id: ReplicaId,
603        min_replicas: usize,
604    ) -> Result<()> {
605        let primary_id = self.primary_id.ok_or(KubericError::NotPrimary)?;
606        assert_ne!(
607            secondary_id, primary_id,
608            "cannot remove the primary — use switchover first"
609        );
610        assert!(
611            self.replicas.contains_key(&secondary_id),
612            "replica {} not found",
613            secondary_id
614        );
615        assert!(
616            self.replicas.len() > min_replicas,
617            "cannot scale below min_replicas ({})",
618            min_replicas
619        );
620
621        info!(secondary_id, "removing secondary (scale-down)");
622
623        // 1. Reconfigure without the target replica (config-first)
624        let secondary_ids: Vec<ReplicaId> = self
625            .replicas
626            .keys()
627            .filter(|&&id| id != primary_id && id != secondary_id)
628            .cloned()
629            .collect();
630
631        let total_count = (self.replicas.len() - 1) as u32; // after removal
632        let write_quorum = total_count / 2 + 1;
633
634        let members: Vec<ReplicaInfo> = secondary_ids
635            .iter()
636            .map(|&id| {
637                let entry = &self.replicas[&id];
638                ReplicaInfo {
639                    id,
640                    role: Role::ActiveSecondary,
641                    status: ReplicaStatus::Up,
642                    replicator_address: entry.handle.replicator_address(),
643                    current_progress: entry.handle.current_progress(),
644                    catch_up_capability: entry.handle.catch_up_capability(),
645                    must_catch_up: false,
646                }
647            })
648            .collect();
649
650        let new_config = ReplicaSetConfig {
651            members,
652            write_quorum,
653        };
654
655        self.replicas[&primary_id]
656            .handle
657            .update_catch_up_configuration(new_config.clone(), self.current_config.clone())
658            .await?;
659
660        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
661
662        self.replicas[&primary_id]
663            .handle
664            .wait_for_catch_up_quorum(ReplicaSetQuorumMode::Write)
665            .await?;
666
667        self.replicas[&primary_id]
668            .handle
669            .update_current_configuration(new_config.clone())
670            .await?;
671
672        self.current_config = new_config;
673
674        // 2. Close the removed replica
675        let removed = self.replicas.remove(&secondary_id).unwrap();
676        let _ = removed.handle.change_role(self.epoch, Role::None).await;
677        let _ = removed.handle.close().await;
678
679        info!(
680            secondary_id,
681            remaining = self.replicas.len(),
682            "secondary removed"
683        );
684        Ok(())
685    }
686
687    // -----------------------------------------------------------------------
688    // Workflow: Add Replica (scale-up or rebuild)
689    // -----------------------------------------------------------------------
690
691    /// Add a new replica to the partition. The primary builds it via the
692    /// copy protocol, then it joins the quorum configuration.
693    ///
694    /// Used for:
695    /// - **Scale-up:** operator creates a new pod, calls add_replica
696    /// - **Restart:** restart_secondary calls this after closing the old handle
697    ///
698    /// Flow:
699    /// 1. Open + set epoch + assign idle role
700    /// 2. build_replica on primary (copies state via data plane)
701    /// 3. Promote idle → active
702    /// 4. Reconfigure quorum (must_catch_up on the new replica)
703    pub async fn add_replica(&mut self, handle: Box<dyn ReplicaHandle>) -> Result<()> {
704        let primary_id = self.primary_id.ok_or(KubericError::NotPrimary)?;
705        let replica_id = handle.id();
706
707        assert_ne!(
708            replica_id, primary_id,
709            "cannot add the primary as a secondary"
710        );
711        assert!(
712            !self.replicas.contains_key(&replica_id),
713            "replica {} already exists — use restart_secondary to replace",
714            replica_id
715        );
716
717        let epoch = self.epoch;
718        info!(replica_id, ?epoch, "adding replica");
719
720        // Store handle
721        self.replicas.insert(
722            replica_id,
723            ReplicaState {
724                handle,
725                role: Role::None,
726            },
727        );
728
729        // 1. Open + set epoch + assign idle role
730        let h = &self.replicas[&replica_id].handle;
731        h.open(OpenMode::New).await?;
732        h.update_epoch(epoch).await?;
733        h.change_role(epoch, Role::IdleSecondary).await?;
734        self.replicas.get_mut(&replica_id).unwrap().role = Role::IdleSecondary;
735
736        // 2. build_replica on primary (copies state via data plane)
737        let addr = self.replicas[&replica_id].handle.replicator_address();
738        let replica_info = ReplicaInfo {
739            id: replica_id,
740            role: Role::IdleSecondary,
741            status: ReplicaStatus::Up,
742            replicator_address: addr,
743            current_progress: -1,
744            catch_up_capability: -1,
745            must_catch_up: false,
746        };
747        self.replicas[&primary_id]
748            .handle
749            .build_replica(replica_info)
750            .await?;
751
752        // 3. Promote idle → active
753        self.replicas[&replica_id]
754            .handle
755            .change_role(epoch, Role::ActiveSecondary)
756            .await?;
757        self.replicas.get_mut(&replica_id).unwrap().role = Role::ActiveSecondary;
758
759        // 4. Reconfigure quorum (rebuild full config, must_catch_up on new replica)
760        self.reconfigure_quorum(primary_id, Some(replica_id))
761            .await?;
762
763        info!(replica_id, "replica added");
764        Ok(())
765    }
766
767    // -----------------------------------------------------------------------
768    // Workflow: Restart Secondary
769    // -----------------------------------------------------------------------
770
771    /// Restart a secondary replica. The old handle is replaced with a new one
772    /// (simulating pod restart with fresh state). The primary rebuilds it via
773    /// the copy protocol.
774    pub async fn restart_secondary(
775        &mut self,
776        secondary_id: ReplicaId,
777        new_handle: Box<dyn ReplicaHandle>,
778    ) -> Result<()> {
779        let primary_id = self.primary_id.ok_or(KubericError::NotPrimary)?;
780        assert_ne!(
781            secondary_id, primary_id,
782            "cannot restart the primary with restart_secondary"
783        );
784        assert!(
785            self.replicas.contains_key(&secondary_id),
786            "replica {} not found — use add_replica for new replicas",
787            secondary_id
788        );
789
790        info!(secondary_id, "restarting secondary");
791
792        // 1. Close old secondary (best effort — may be dead)
793        if let Some(old) = self.replicas.get(&secondary_id) {
794            let _ = old.handle.close().await;
795        }
796
797        // 2. Remove old handle, then add_replica with new one
798        self.replicas.remove(&secondary_id);
799
800        // Ensure new_handle has the same ID
801        assert_eq!(new_handle.id(), secondary_id);
802        self.add_replica(new_handle).await
803    }
804
805    // -----------------------------------------------------------------------
806    // Internal: Reconfigure quorum after adding/rebuilding a replica
807    // -----------------------------------------------------------------------
808
809    async fn reconfigure_quorum(
810        &mut self,
811        primary_id: ReplicaId,
812        must_catch_up_id: Option<ReplicaId>,
813    ) -> Result<()> {
814        let secondary_ids: Vec<ReplicaId> = self
815            .replicas
816            .keys()
817            .filter(|&&id| id != primary_id)
818            .cloned()
819            .collect();
820
821        let total_count = self.replicas.len() as u32;
822        let write_quorum = total_count / 2 + 1;
823
824        let members: Vec<ReplicaInfo> = secondary_ids
825            .iter()
826            .map(|&id| {
827                let entry = &self.replicas[&id];
828                ReplicaInfo {
829                    id,
830                    role: Role::ActiveSecondary,
831                    status: ReplicaStatus::Up,
832                    replicator_address: entry.handle.replicator_address(),
833                    current_progress: entry.handle.current_progress(),
834                    catch_up_capability: entry.handle.catch_up_capability(),
835                    must_catch_up: must_catch_up_id == Some(id),
836                }
837            })
838            .collect();
839
840        let new_config = ReplicaSetConfig {
841            members,
842            write_quorum,
843        };
844
845        self.replicas[&primary_id]
846            .handle
847            .update_catch_up_configuration(new_config.clone(), self.current_config.clone())
848            .await?;
849
850        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
851
852        self.replicas[&primary_id]
853            .handle
854            .wait_for_catch_up_quorum(ReplicaSetQuorumMode::Write)
855            .await?;
856
857        self.replicas[&primary_id]
858            .handle
859            .update_current_configuration(new_config.clone())
860            .await?;
861
862        self.current_config = new_config;
863        Ok(())
864    }
865}
866
867// ---------------------------------------------------------------------------
868// In-process ReplicaHandle implementation (for tests)
869// ---------------------------------------------------------------------------
870
871#[cfg(any(test, feature = "testing"))]
872pub mod testing {
873    use super::*;
874    use std::sync::Arc;
875
876    use tokio::sync::{mpsc, oneshot};
877    use tonic::transport::Server;
878
879    use crate::events::{ReplicateRequest, ReplicatorControlEvent};
880    use crate::handles::{PartitionState, StateReplicatorHandle};
881    use crate::proto::replicator_data_server::ReplicatorDataServer;
882    use crate::replicator::actor::WalReplicatorActor;
883    use crate::replicator::secondary::{SecondaryReceiver, SecondaryState};
884    use crate::types::{AccessStatus, CancellationToken};
885
886    /// In-process replica handle: wraps channels to a local replicator actor
887    /// and a local gRPC secondary server.
888    pub struct InProcessReplicaHandle {
889        id: ReplicaId,
890        control_tx: mpsc::Sender<ReplicatorControlEvent>,
891        data_tx: mpsc::Sender<ReplicateRequest>,
892        state: Arc<PartitionState>,
893        pub secondary_state: Arc<SecondaryState>,
894        grpc_address: String,
895        shutdown_token: CancellationToken,
896        _actor_handle: tokio::task::JoinHandle<()>,
897        _grpc_handle: tokio::task::JoinHandle<()>,
898    }
899
900    impl InProcessReplicaHandle {
901        /// Spawn a new in-process replica (actor + gRPC server).
902        pub async fn spawn(id: ReplicaId) -> Result<Self> {
903            let (control_tx, control_rx) = mpsc::channel(16);
904            let (data_tx, data_rx) = mpsc::channel::<ReplicateRequest>(256);
905            let state = Arc::new(PartitionState::new());
906            let secondary_state = Arc::new(SecondaryState::new());
907            let shutdown_token = CancellationToken::new();
908
909            // Start gRPC server with graceful shutdown
910            let receiver = SecondaryReceiver::new(secondary_state.clone());
911            let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
912                .await
913                .map_err(|e| KubericError::Internal(Box::new(e)))?;
914            let addr = listener.local_addr().unwrap();
915            let grpc_address = format!("http://{}", addr);
916
917            let grpc_shutdown = shutdown_token.child_token();
918            let grpc_handle = tokio::spawn(async move {
919                let _ = Server::builder()
920                    .add_service(ReplicatorDataServer::new(receiver))
921                    .serve_with_incoming_shutdown(
922                        tokio_stream::wrappers::TcpListenerStream::new(listener),
923                        grpc_shutdown.cancelled(),
924                    )
925                    .await;
926            });
927
928            // Start replicator actor
929            let actor = WalReplicatorActor::new(id);
930            let state_cp = state.clone();
931            // In-process replicas create a dummy state_provider_tx (not used in tests)
932            let (sp_tx, _sp_rx) = mpsc::unbounded_channel();
933            let actor_handle = tokio::spawn(async move {
934                actor.run(control_rx, data_rx, state_cp, sp_tx).await;
935            });
936
937            Ok(Self {
938                id,
939                control_tx,
940                data_tx,
941                state,
942                secondary_state,
943                grpc_address,
944                shutdown_token,
945                _actor_handle: actor_handle,
946                _grpc_handle: grpc_handle,
947            })
948        }
949
950        async fn send_control(
951            &self,
952            make: impl FnOnce(oneshot::Sender<Result<()>>) -> ReplicatorControlEvent,
953        ) -> Result<()> {
954            let (tx, rx) = oneshot::channel();
955            self.control_tx
956                .send(make(tx))
957                .await
958                .map_err(|_| KubericError::Closed)?;
959            rx.await.map_err(|_| KubericError::Closed)?
960        }
961
962        /// Get a user-facing StateReplicatorHandle for writing data (test helper).
963        pub fn state_replicator(&self) -> StateReplicatorHandle {
964            StateReplicatorHandle::new(self.data_tx.clone(), self.state.clone())
965        }
966    }
967
968    #[async_trait]
969    impl ReplicaHandle for InProcessReplicaHandle {
970        fn id(&self) -> ReplicaId {
971            self.id
972        }
973
974        async fn open(&self, mode: OpenMode) -> Result<()> {
975            self.send_control(|reply| ReplicatorControlEvent::Open { mode, reply })
976                .await
977        }
978
979        async fn close(&self) -> Result<()> {
980            let result = self
981                .send_control(|reply| ReplicatorControlEvent::Close { reply })
982                .await;
983            self.shutdown_token.cancel();
984            result
985        }
986
987        fn abort(&self) {
988            let _ = self.control_tx.try_send(ReplicatorControlEvent::Abort);
989            self.shutdown_token.cancel();
990        }
991
992        async fn change_role(&self, epoch: Epoch, role: Role) -> Result<()> {
993            self.secondary_state.update_epoch(epoch);
994            self.send_control(|reply| ReplicatorControlEvent::ChangeRole { epoch, role, reply })
995                .await?;
996            // Mirror PodRuntime: set access status based on role
997            match role {
998                Role::Primary => {
999                    self.state.set_read_status(AccessStatus::Granted);
1000                    self.state.set_write_status(AccessStatus::Granted);
1001                }
1002                _ => {
1003                    self.state.set_read_status(AccessStatus::NotPrimary);
1004                    self.state.set_write_status(AccessStatus::NotPrimary);
1005                }
1006            }
1007            Ok(())
1008        }
1009
1010        async fn update_epoch(&self, epoch: Epoch) -> Result<()> {
1011            self.secondary_state.update_epoch(epoch);
1012            self.send_control(|reply| ReplicatorControlEvent::UpdateEpoch { epoch, reply })
1013                .await
1014        }
1015
1016        fn current_progress(&self) -> Lsn {
1017            self.state.current_progress()
1018        }
1019
1020        fn catch_up_capability(&self) -> Lsn {
1021            self.state.catch_up_capability()
1022        }
1023
1024        async fn on_data_loss(&self) -> Result<DataLossAction> {
1025            let (tx, rx) = oneshot::channel();
1026            self.control_tx
1027                .send(ReplicatorControlEvent::OnDataLoss { reply: tx })
1028                .await
1029                .map_err(|_| KubericError::Closed)?;
1030            rx.await.map_err(|_| KubericError::Closed)?
1031        }
1032
1033        async fn update_catch_up_configuration(
1034            &self,
1035            current: ReplicaSetConfig,
1036            previous: ReplicaSetConfig,
1037        ) -> Result<()> {
1038            let (tx, rx) = oneshot::channel();
1039            self.control_tx
1040                .send(ReplicatorControlEvent::UpdateCatchUpConfiguration {
1041                    current,
1042                    previous,
1043                    reply: tx,
1044                })
1045                .await
1046                .map_err(|_| KubericError::Closed)?;
1047            rx.await.map_err(|_| KubericError::Closed)?
1048        }
1049
1050        async fn update_current_configuration(&self, current: ReplicaSetConfig) -> Result<()> {
1051            let (tx, rx) = oneshot::channel();
1052            self.control_tx
1053                .send(ReplicatorControlEvent::UpdateCurrentConfiguration { current, reply: tx })
1054                .await
1055                .map_err(|_| KubericError::Closed)?;
1056            rx.await.map_err(|_| KubericError::Closed)?
1057        }
1058
1059        async fn wait_for_catch_up_quorum(&self, mode: ReplicaSetQuorumMode) -> Result<()> {
1060            self.send_control(|reply| ReplicatorControlEvent::WaitForCatchUpQuorum { mode, reply })
1061                .await
1062        }
1063
1064        async fn build_replica(&self, replica: ReplicaInfo) -> Result<()> {
1065            self.send_control(|reply| ReplicatorControlEvent::BuildReplica { replica, reply })
1066                .await
1067        }
1068
1069        async fn remove_replica(&self, replica_id: ReplicaId) -> Result<()> {
1070            self.send_control(|reply| ReplicatorControlEvent::RemoveReplica { replica_id, reply })
1071                .await
1072        }
1073
1074        async fn revoke_write_status(&self) -> Result<()> {
1075            self.state
1076                .set_write_status(AccessStatus::ReconfigurationPending);
1077            Ok(())
1078        }
1079
1080        fn replicator_address(&self) -> String {
1081            self.grpc_address.clone()
1082        }
1083    }
1084
1085    /// Convenience: spawn N in-process replicas.
1086    pub async fn spawn_replicas(count: usize) -> Result<Vec<Box<dyn ReplicaHandle>>> {
1087        let mut handles: Vec<Box<dyn ReplicaHandle>> = Vec::new();
1088        for i in 1..=(count as ReplicaId) {
1089            handles.push(Box::new(InProcessReplicaHandle::spawn(i).await?));
1090        }
1091        Ok(handles)
1092    }
1093}