kuberic_core/replicator/actor.rs
1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use tokio::sync::{Mutex as TokioMutex, mpsc};
5use tracing::{info, warn};
6
7use crate::error::KubericError;
8use crate::events::{ReplicateRequest, ReplicatorControlEvent, StateProviderEvent};
9use crate::handles::PartitionState;
10use crate::replicator::primary::PrimarySender;
11use crate::replicator::queue::ReplicationQueue;
12use crate::replicator::quorum::QuorumTracker;
13use crate::types::{DataLossAction, Epoch, Lsn, ReplicaId, Role};
14
15/// The WalReplicator actor. Processes control and data events in a single
16/// loop with biased select (control has priority). The data path is
17/// non-blocking because PrimarySender::send_to_all uses unbounded channels
18/// with per-secondary drain tasks (matching SF's async dispatch model).
19///
20/// Owns a ReplicationQueue that retains ops for replay to new replicas,
21/// matching SF's ReplicationQueueManager pattern.
22pub struct WalReplicatorActor {
23 replica_id: ReplicaId,
24}
25
26impl WalReplicatorActor {
27 pub fn new(replica_id: ReplicaId) -> Self {
28 Self { replica_id }
29 }
30
31 #[allow(unused_assignments)]
32 pub async fn run(
33 self,
34 mut control_rx: mpsc::Receiver<ReplicatorControlEvent>,
35 mut data_rx: mpsc::Receiver<ReplicateRequest>,
36 state: Arc<PartitionState>,
37 state_provider_tx: mpsc::UnboundedSender<StateProviderEvent>,
38 ) {
39 let mut role = Role::None;
40 let mut epoch = Epoch::default();
41 let mut next_lsn: Lsn = 1;
42 let quorum_tracker = Arc::new(TokioMutex::new(QuorumTracker::new()));
43 let mut primary_sender: Option<PrimarySender> = None;
44 let mut replication_queue = ReplicationQueue::new();
45
46 loop {
47 tokio::select! {
48 biased;
49
50 event = control_rx.recv() => {
51 let Some(event) = event else { break };
52 match event {
53 ReplicatorControlEvent::Open { reply, .. } => {
54 info!(replica_id = self.replica_id, "replicator opened");
55 let _ = reply.send(Ok(()));
56 }
57 ReplicatorControlEvent::Close { reply } => {
58 info!(replica_id = self.replica_id, "replicator closing");
59 quorum_tracker.lock().await.fail_all(KubericError::Closed);
60 if let Some(mut sender) = primary_sender.take() {
61 sender.close_all();
62 }
63 replication_queue.clear();
64 let _ = reply.send(Ok(()));
65 break;
66 }
67 ReplicatorControlEvent::Abort => {
68 quorum_tracker.lock().await.fail_all(KubericError::Closed);
69 if let Some(mut sender) = primary_sender.take() {
70 sender.close_all();
71 }
72 replication_queue.clear();
73 break;
74 }
75 ReplicatorControlEvent::ChangeRole {
76 epoch: new_epoch,
77 role: new_role,
78 reply,
79 } => {
80 info!(
81 replica_id = self.replica_id,
82 ?new_role,
83 ?new_epoch,
84 "replicator changing role"
85 );
86
87 if role == Role::Primary && new_role != Role::Primary {
88 quorum_tracker.lock().await.fail_all(KubericError::NotPrimary);
89 if let Some(mut sender) = primary_sender.take() {
90 sender.close_all();
91 }
92 replication_queue.clear();
93 }
94
95 epoch = new_epoch;
96 role = new_role;
97
98 if role == Role::Primary {
99 primary_sender = Some(PrimarySender::new(self.replica_id, epoch));
100 }
101
102 let _ = reply.send(Ok(()));
103 }
104 ReplicatorControlEvent::UpdateEpoch {
105 epoch: new_epoch,
106 reply,
107 } => {
108 info!(
109 replica_id = self.replica_id,
110 ?new_epoch,
111 "updating epoch"
112 );
113 // Update local epoch first
114 epoch = new_epoch;
115
116 // Forward to state provider (inline — must complete before next event)
117 let prev_lsn = state.committed_lsn();
118 let (sp_tx, sp_rx) = tokio::sync::oneshot::channel();
119 if state_provider_tx.send(StateProviderEvent::UpdateEpoch {
120 epoch: new_epoch,
121 previous_epoch_last_lsn: prev_lsn,
122 reply: sp_tx,
123 }).is_err() {
124 let _ = reply.send(Err(KubericError::Closed));
125 continue;
126 }
127 match tokio::time::timeout(
128 std::time::Duration::from_secs(30), sp_rx
129 ).await {
130 Ok(Ok(result)) => { let _ = reply.send(result); }
131 Ok(Err(_)) => { let _ = reply.send(Err(KubericError::Closed)); }
132 Err(_) => { let _ = reply.send(Err(KubericError::Internal(
133 "state provider UpdateEpoch timeout".into()))); }
134 }
135 }
136 ReplicatorControlEvent::UpdateCatchUpConfiguration {
137 current,
138 previous,
139 reply,
140 } => {
141 let mut cc_members: HashSet<ReplicaId> =
142 current.members.iter().map(|r| r.id).collect();
143 cc_members.insert(self.replica_id);
144 let mut pc_members: HashSet<ReplicaId> =
145 previous.members.iter().map(|r| r.id).collect();
146 if !pc_members.is_empty() {
147 pc_members.insert(self.replica_id);
148 }
149
150 let must_catch_up: HashSet<ReplicaId> = current
151 .members
152 .iter()
153 .filter(|r| r.must_catch_up)
154 .map(|r| r.id)
155 .collect();
156
157 let member_progress: HashMap<ReplicaId, Lsn> = current
158 .members
159 .iter()
160 .map(|r| (r.id, r.current_progress))
161 .collect();
162
163 quorum_tracker.lock().await.set_catch_up_configuration(
164 cc_members,
165 current.write_quorum,
166 pc_members,
167 previous.write_quorum,
168 must_catch_up,
169 member_progress,
170 );
171
172 // Connect new secondaries and replay pending ops
173 if let Some(sender) = &mut primary_sender {
174 for member in ¤t.members {
175 if member.id != self.replica_id
176 && !sender.has_connection(&member.id)
177 {
178 if let Err(e) = sender
179 .add_secondary(
180 member.id,
181 member.replicator_address.clone(),
182 quorum_tracker.clone(),
183 state.clone(),
184 )
185 .await
186 {
187 warn!(
188 replica_id = member.id,
189 error = %e,
190 "failed to connect to secondary"
191 );
192 continue;
193 }
194
195 // Replay ops beyond the copy boundary.
196 // copy_lsn is the snapshot LSN recorded by
197 // run_build_replica_copy — the secondary
198 // already has state through this LSN.
199 let copy_lsn = state
200 .take_copy_lsn(&member.id)
201 .unwrap_or(0);
202 let replay_from = copy_lsn + 1;
203 let pending = replication_queue.ops_from(replay_from);
204 if !pending.is_empty() {
205 info!(
206 replica_id = member.id,
207 copy_lsn,
208 replay_from,
209 count = pending.len(),
210 "replaying ops from replication queue"
211 );
212 for (lsn, data) in &pending {
213 sender.send_to_one(member.id, *lsn, data, state.committed_lsn());
214 }
215 }
216 }
217 }
218 }
219
220 let _ = reply.send(Ok(()));
221 }
222 ReplicatorControlEvent::UpdateCurrentConfiguration {
223 current,
224 reply,
225 } => {
226 let mut cc_members: HashSet<ReplicaId> =
227 current.members.iter().map(|r| r.id).collect();
228 cc_members.insert(self.replica_id);
229
230 quorum_tracker.lock().await.set_current_configuration(
231 cc_members.clone(),
232 current.write_quorum,
233 );
234
235 if let Some(sender) = &mut primary_sender {
236 let to_remove: Vec<ReplicaId> = sender
237 .connected_ids()
238 .into_iter()
239 .filter(|id| !cc_members.contains(id))
240 .collect();
241 for id in to_remove {
242 sender.remove_secondary(id);
243 }
244 }
245
246 let _ = reply.send(Ok(()));
247
248 // GC replication queue — config is finalized,
249 // all replicas are caught up. Safe to remove
250 // ops up to committed_lsn.
251 let committed = state.committed_lsn();
252 replication_queue.gc(committed);
253 }
254 ReplicatorControlEvent::WaitForCatchUpQuorum { mode, reply } => {
255 quorum_tracker.lock().await.wait_for_catch_up(mode, reply);
256 }
257 ReplicatorControlEvent::BuildReplica { replica, reply } => {
258 // Replication queue ops are replayed at add_secondary time.
259 // Spawn the copy protocol as a background task.
260 info!(
261 replica_id = replica.id,
262 queue_len = replication_queue.len(),
263 "BuildReplica: spawning copy task"
264 );
265 let sp_tx = state_provider_tx.clone();
266 let st = state.clone();
267 tokio::spawn(async move {
268 let result = crate::replicator::copy::run_build_replica_copy(
269 replica,
270 sp_tx,
271 st,
272 std::time::Duration::from_secs(30),
273 ).await;
274 let _ = reply.send(result);
275 });
276 }
277 ReplicatorControlEvent::RemoveReplica { replica_id, reply } => {
278 if let Some(sender) = &mut primary_sender {
279 sender.remove_secondary(replica_id);
280 }
281 let _ = reply.send(Ok(()));
282 }
283 ReplicatorControlEvent::OnDataLoss { reply } => {
284 // Forward to state provider, convert bool → DataLossAction
285 let (sp_tx, sp_rx) = tokio::sync::oneshot::channel();
286 if state_provider_tx.send(StateProviderEvent::OnDataLoss {
287 reply: sp_tx,
288 }).is_err() {
289 let _ = reply.send(Err(KubericError::Closed));
290 continue;
291 }
292 match tokio::time::timeout(
293 std::time::Duration::from_secs(30), sp_rx
294 ).await {
295 Ok(Ok(Ok(state_changed))) => {
296 let action = if state_changed {
297 DataLossAction::StateChanged
298 } else {
299 DataLossAction::None
300 };
301 let _ = reply.send(Ok(action));
302 }
303 Ok(Ok(Err(e))) => { let _ = reply.send(Err(e)); }
304 Ok(Err(_)) => { let _ = reply.send(Err(KubericError::Closed)); }
305 Err(_) => { let _ = reply.send(Err(KubericError::Internal(
306 "state provider OnDataLoss timeout".into()))); }
307 }
308 }
309 }
310 }
311
312 req = data_rx.recv(), if role == Role::Primary => {
313 let Some(req) = req else { break };
314 let lsn = next_lsn;
315 next_lsn += 1;
316
317 // Store in replication queue for replay to new replicas
318 replication_queue.push(lsn, req.data.clone());
319
320 // Register with quorum tracker (primary's own ACK counted)
321 quorum_tracker.lock().await.register(lsn, self.replica_id, req.reply);
322
323 // Read committed_lsn AFTER register — the registration may
324 // have triggered immediate commit (single replica case), and
325 // previous ops' ACKs may have been processed by the background
326 // ACK reader, advancing committed_lsn further.
327 let committed = quorum_tracker.lock().await.committed_lsn();
328 state.set_current_progress(lsn);
329 state.set_committed_lsn(committed);
330
331 // Non-blocking: send_to_all uses unbounded channels.
332 // Include committed_lsn so secondaries can track commit progress.
333 if let Some(sender) = &mut primary_sender {
334 sender.send_to_all(lsn, &req.data, committed);
335 }
336 }
337
338 else => break,
339 }
340 }
341 }
342}