Skip to main content

kuberic_core/replicator/
actor.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use tokio::sync::{Mutex as TokioMutex, mpsc};
5use tracing::{info, warn};
6
7use crate::error::KubericError;
8use crate::events::{ReplicateRequest, ReplicatorControlEvent, StateProviderEvent};
9use crate::handles::PartitionState;
10use crate::replicator::primary::PrimarySender;
11use crate::replicator::queue::ReplicationQueue;
12use crate::replicator::quorum::QuorumTracker;
13use crate::types::{DataLossAction, Epoch, Lsn, ReplicaId, Role};
14
15/// The WalReplicator actor. Processes control and data events in a single
16/// loop with biased select (control has priority). The data path is
17/// non-blocking because PrimarySender::send_to_all uses unbounded channels
18/// with per-secondary drain tasks (matching SF's async dispatch model).
19///
20/// Owns a ReplicationQueue that retains ops for replay to new replicas,
21/// matching SF's ReplicationQueueManager pattern.
22pub struct WalReplicatorActor {
23    replica_id: ReplicaId,
24}
25
26impl WalReplicatorActor {
27    pub fn new(replica_id: ReplicaId) -> Self {
28        Self { replica_id }
29    }
30
31    #[allow(unused_assignments)]
32    pub async fn run(
33        self,
34        mut control_rx: mpsc::Receiver<ReplicatorControlEvent>,
35        mut data_rx: mpsc::Receiver<ReplicateRequest>,
36        state: Arc<PartitionState>,
37        state_provider_tx: mpsc::UnboundedSender<StateProviderEvent>,
38    ) {
39        let mut role = Role::None;
40        let mut epoch = Epoch::default();
41        let mut next_lsn: Lsn = 1;
42        let quorum_tracker = Arc::new(TokioMutex::new(QuorumTracker::new()));
43        let mut primary_sender: Option<PrimarySender> = None;
44        let mut replication_queue = ReplicationQueue::new();
45
46        loop {
47            tokio::select! {
48                biased;
49
50                event = control_rx.recv() => {
51                    let Some(event) = event else { break };
52                    match event {
53                        ReplicatorControlEvent::Open { reply, .. } => {
54                            info!(replica_id = self.replica_id, "replicator opened");
55                            let _ = reply.send(Ok(()));
56                        }
57                        ReplicatorControlEvent::Close { reply } => {
58                            info!(replica_id = self.replica_id, "replicator closing");
59                            quorum_tracker.lock().await.fail_all(KubericError::Closed);
60                            if let Some(mut sender) = primary_sender.take() {
61                                sender.close_all();
62                            }
63                            replication_queue.clear();
64                            let _ = reply.send(Ok(()));
65                            break;
66                        }
67                        ReplicatorControlEvent::Abort => {
68                            quorum_tracker.lock().await.fail_all(KubericError::Closed);
69                            if let Some(mut sender) = primary_sender.take() {
70                                sender.close_all();
71                            }
72                            replication_queue.clear();
73                            break;
74                        }
75                        ReplicatorControlEvent::ChangeRole {
76                            epoch: new_epoch,
77                            role: new_role,
78                            reply,
79                        } => {
80                            info!(
81                                replica_id = self.replica_id,
82                                ?new_role,
83                                ?new_epoch,
84                                "replicator changing role"
85                            );
86
87                            if role == Role::Primary && new_role != Role::Primary {
88                                quorum_tracker.lock().await.fail_all(KubericError::NotPrimary);
89                                if let Some(mut sender) = primary_sender.take() {
90                                    sender.close_all();
91                                }
92                                replication_queue.clear();
93                            }
94
95                            epoch = new_epoch;
96                            role = new_role;
97
98                            if role == Role::Primary {
99                                primary_sender = Some(PrimarySender::new(self.replica_id, epoch));
100                            }
101
102                            let _ = reply.send(Ok(()));
103                        }
104                        ReplicatorControlEvent::UpdateEpoch {
105                            epoch: new_epoch,
106                            reply,
107                        } => {
108                            info!(
109                                replica_id = self.replica_id,
110                                ?new_epoch,
111                                "updating epoch"
112                            );
113                            // Update local epoch first
114                            epoch = new_epoch;
115
116                            // Forward to state provider (inline — must complete before next event)
117                            let prev_lsn = state.committed_lsn();
118                            let (sp_tx, sp_rx) = tokio::sync::oneshot::channel();
119                            if state_provider_tx.send(StateProviderEvent::UpdateEpoch {
120                                epoch: new_epoch,
121                                previous_epoch_last_lsn: prev_lsn,
122                                reply: sp_tx,
123                            }).is_err() {
124                                let _ = reply.send(Err(KubericError::Closed));
125                                continue;
126                            }
127                            match tokio::time::timeout(
128                                std::time::Duration::from_secs(30), sp_rx
129                            ).await {
130                                Ok(Ok(result)) => { let _ = reply.send(result); }
131                                Ok(Err(_)) => { let _ = reply.send(Err(KubericError::Closed)); }
132                                Err(_) => { let _ = reply.send(Err(KubericError::Internal(
133                                    "state provider UpdateEpoch timeout".into()))); }
134                            }
135                        }
136                        ReplicatorControlEvent::UpdateCatchUpConfiguration {
137                            current,
138                            previous,
139                            reply,
140                        } => {
141                            let mut cc_members: HashSet<ReplicaId> =
142                                current.members.iter().map(|r| r.id).collect();
143                            cc_members.insert(self.replica_id);
144                            let mut pc_members: HashSet<ReplicaId> =
145                                previous.members.iter().map(|r| r.id).collect();
146                            if !pc_members.is_empty() {
147                                pc_members.insert(self.replica_id);
148                            }
149
150                            let must_catch_up: HashSet<ReplicaId> = current
151                                .members
152                                .iter()
153                                .filter(|r| r.must_catch_up)
154                                .map(|r| r.id)
155                                .collect();
156
157                            let member_progress: HashMap<ReplicaId, Lsn> = current
158                                .members
159                                .iter()
160                                .map(|r| (r.id, r.current_progress))
161                                .collect();
162
163                            quorum_tracker.lock().await.set_catch_up_configuration(
164                                cc_members,
165                                current.write_quorum,
166                                pc_members,
167                                previous.write_quorum,
168                                must_catch_up,
169                                member_progress,
170                            );
171
172                            // Connect new secondaries and replay pending ops
173                            if let Some(sender) = &mut primary_sender {
174                                for member in &current.members {
175                                    if member.id != self.replica_id
176                                        && !sender.has_connection(&member.id)
177                                    {
178                                        if let Err(e) = sender
179                                            .add_secondary(
180                                                member.id,
181                                                member.replicator_address.clone(),
182                                                quorum_tracker.clone(),
183                                                state.clone(),
184                                            )
185                                            .await
186                                        {
187                                            warn!(
188                                                replica_id = member.id,
189                                                error = %e,
190                                                "failed to connect to secondary"
191                                            );
192                                            continue;
193                                        }
194
195                                        // Replay ops beyond the copy boundary.
196                                        // copy_lsn is the snapshot LSN recorded by
197                                        // run_build_replica_copy — the secondary
198                                        // already has state through this LSN.
199                                        let copy_lsn = state
200                                            .take_copy_lsn(&member.id)
201                                            .unwrap_or(0);
202                                        let replay_from = copy_lsn + 1;
203                                        let pending = replication_queue.ops_from(replay_from);
204                                        if !pending.is_empty() {
205                                            info!(
206                                                replica_id = member.id,
207                                                copy_lsn,
208                                                replay_from,
209                                                count = pending.len(),
210                                                "replaying ops from replication queue"
211                                            );
212                                            for (lsn, data) in &pending {
213                                                sender.send_to_one(member.id, *lsn, data, state.committed_lsn());
214                                            }
215                                        }
216                                    }
217                                }
218                            }
219
220                            let _ = reply.send(Ok(()));
221                        }
222                        ReplicatorControlEvent::UpdateCurrentConfiguration {
223                            current,
224                            reply,
225                        } => {
226                            let mut cc_members: HashSet<ReplicaId> =
227                                current.members.iter().map(|r| r.id).collect();
228                            cc_members.insert(self.replica_id);
229
230                            quorum_tracker.lock().await.set_current_configuration(
231                                cc_members.clone(),
232                                current.write_quorum,
233                            );
234
235                            if let Some(sender) = &mut primary_sender {
236                                let to_remove: Vec<ReplicaId> = sender
237                                    .connected_ids()
238                                    .into_iter()
239                                    .filter(|id| !cc_members.contains(id))
240                                    .collect();
241                                for id in to_remove {
242                                    sender.remove_secondary(id);
243                                }
244                            }
245
246                            let _ = reply.send(Ok(()));
247
248                            // GC replication queue — config is finalized,
249                            // all replicas are caught up. Safe to remove
250                            // ops up to committed_lsn.
251                            let committed = state.committed_lsn();
252                            replication_queue.gc(committed);
253                        }
254                        ReplicatorControlEvent::WaitForCatchUpQuorum { mode, reply } => {
255                            quorum_tracker.lock().await.wait_for_catch_up(mode, reply);
256                        }
257                        ReplicatorControlEvent::BuildReplica { replica, reply } => {
258                            // Replication queue ops are replayed at add_secondary time.
259                            // Spawn the copy protocol as a background task.
260                            info!(
261                                replica_id = replica.id,
262                                queue_len = replication_queue.len(),
263                                "BuildReplica: spawning copy task"
264                            );
265                            let sp_tx = state_provider_tx.clone();
266                            let st = state.clone();
267                            tokio::spawn(async move {
268                                let result = crate::replicator::copy::run_build_replica_copy(
269                                    replica,
270                                    sp_tx,
271                                    st,
272                                    std::time::Duration::from_secs(30),
273                                ).await;
274                                let _ = reply.send(result);
275                            });
276                        }
277                        ReplicatorControlEvent::RemoveReplica { replica_id, reply } => {
278                            if let Some(sender) = &mut primary_sender {
279                                sender.remove_secondary(replica_id);
280                            }
281                            let _ = reply.send(Ok(()));
282                        }
283                        ReplicatorControlEvent::OnDataLoss { reply } => {
284                            // Forward to state provider, convert bool → DataLossAction
285                            let (sp_tx, sp_rx) = tokio::sync::oneshot::channel();
286                            if state_provider_tx.send(StateProviderEvent::OnDataLoss {
287                                reply: sp_tx,
288                            }).is_err() {
289                                let _ = reply.send(Err(KubericError::Closed));
290                                continue;
291                            }
292                            match tokio::time::timeout(
293                                std::time::Duration::from_secs(30), sp_rx
294                            ).await {
295                                Ok(Ok(Ok(state_changed))) => {
296                                    let action = if state_changed {
297                                        DataLossAction::StateChanged
298                                    } else {
299                                        DataLossAction::None
300                                    };
301                                    let _ = reply.send(Ok(action));
302                                }
303                                Ok(Ok(Err(e))) => { let _ = reply.send(Err(e)); }
304                                Ok(Err(_)) => { let _ = reply.send(Err(KubericError::Closed)); }
305                                Err(_) => { let _ = reply.send(Err(KubericError::Internal(
306                                    "state provider OnDataLoss timeout".into()))); }
307                            }
308                        }
309                    }
310                }
311
312                req = data_rx.recv(), if role == Role::Primary => {
313                    let Some(req) = req else { break };
314                    let lsn = next_lsn;
315                    next_lsn += 1;
316
317                    // Store in replication queue for replay to new replicas
318                    replication_queue.push(lsn, req.data.clone());
319
320                    // Register with quorum tracker (primary's own ACK counted)
321                    quorum_tracker.lock().await.register(lsn, self.replica_id, req.reply);
322
323                    // Read committed_lsn AFTER register — the registration may
324                    // have triggered immediate commit (single replica case), and
325                    // previous ops' ACKs may have been processed by the background
326                    // ACK reader, advancing committed_lsn further.
327                    let committed = quorum_tracker.lock().await.committed_lsn();
328                    state.set_current_progress(lsn);
329                    state.set_committed_lsn(committed);
330
331                    // Non-blocking: send_to_all uses unbounded channels.
332                    // Include committed_lsn so secondaries can track commit progress.
333                    if let Some(sender) = &mut primary_sender {
334                        sender.send_to_all(lsn, &req.data, committed);
335                    }
336                }
337
338                else => break,
339            }
340        }
341    }
342}