Skip to main content

kuberic_core/
noop.rs

1use std::sync::Arc;
2
3use tokio::sync::mpsc;
4
5use crate::events::{ReplicateRequest, ReplicatorControlEvent};
6use crate::handles::PartitionState;
7use crate::types::{Lsn, Role};
8
9/// A no-op replicator for testing. Accepts all control events and returns Ok.
10/// Replicate requests are assigned sequential LSNs and immediately acknowledged.
11///
12/// Note: The replicator does NOT set read/write access status — that is the
13/// runtime's responsibility. The replicator only updates LSN progress values
14/// on PartitionState.
15pub struct NoopReplicator;
16
17impl NoopReplicator {
18    /// Run the noop replicator actor. Processes control and data channels
19    /// until both are closed or an Abort/Close event is received.
20    pub async fn run(
21        mut control_rx: mpsc::Receiver<ReplicatorControlEvent>,
22        mut data_rx: mpsc::Receiver<ReplicateRequest>,
23        state: Arc<PartitionState>,
24    ) {
25        let mut role = Role::None;
26        let mut next_lsn: Lsn = 1;
27
28        loop {
29            tokio::select! {
30                biased;
31
32                event = control_rx.recv() => {
33                    let Some(event) = event else { break };
34                    match event {
35                        ReplicatorControlEvent::Open { reply, .. } => {
36                            let _ = reply.send(Ok(()));
37                        }
38                        ReplicatorControlEvent::Close { reply } => {
39                            let _ = reply.send(Ok(()));
40                            break;
41                        }
42                        ReplicatorControlEvent::Abort => {
43                            break;
44                        }
45                        ReplicatorControlEvent::ChangeRole {
46                            role: new_role,
47                            reply,
48                            ..
49                        } => {
50                            role = new_role;
51                            let _ = reply.send(Ok(()));
52                        }
53                        ReplicatorControlEvent::UpdateEpoch { reply, .. } => {
54                            let _ = reply.send(Ok(()));
55                        }
56                        ReplicatorControlEvent::OnDataLoss { reply } => {
57                            let _ = reply.send(Ok(crate::types::DataLossAction::None));
58                        }
59                        ReplicatorControlEvent::UpdateCatchUpConfiguration { reply, .. } => {
60                            let _ = reply.send(Ok(()));
61                        }
62                        ReplicatorControlEvent::UpdateCurrentConfiguration { reply, .. } => {
63                            let _ = reply.send(Ok(()));
64                        }
65                        ReplicatorControlEvent::WaitForCatchUpQuorum { reply, .. } => {
66                            let _ = reply.send(Ok(()));
67                        }
68                        ReplicatorControlEvent::BuildReplica { reply, .. } => {
69                            let _ = reply.send(Ok(()));
70                        }
71                        ReplicatorControlEvent::RemoveReplica { reply, .. } => {
72                            let _ = reply.send(Ok(()));
73                        }
74                    }
75                }
76
77                req = data_rx.recv(), if role == Role::Primary => {
78                    let Some(req) = req else { break };
79                    let lsn = next_lsn;
80                    next_lsn += 1;
81                    state.set_current_progress(lsn);
82                    state.set_committed_lsn(lsn);
83                    let _ = req.reply.send(Ok(lsn));
84                }
85
86                else => break,
87            }
88        }
89    }
90}
91
92#[cfg(test)]
93mod tests {
94    use super::*;
95    use crate::events::ReplicateRequest;
96    use crate::types::{AccessStatus, CancellationToken, Epoch, OpenMode};
97
98    /// Simulates what the runtime does: sets access status around replicator calls.
99    fn runtime_set_status_for_role(state: &PartitionState, role: Role) {
100        match role {
101            Role::Primary => {
102                state.set_read_status(AccessStatus::Granted);
103                state.set_write_status(AccessStatus::Granted);
104            }
105            _ => {
106                state.set_read_status(AccessStatus::NotPrimary);
107                state.set_write_status(AccessStatus::NotPrimary);
108            }
109        }
110    }
111
112    #[tokio::test]
113    async fn test_noop_lifecycle() {
114        let (control_tx, control_rx) = tokio::sync::mpsc::channel(16);
115        let (data_tx, data_rx) = tokio::sync::mpsc::channel::<ReplicateRequest>(16);
116        let state = Arc::new(PartitionState::new());
117
118        let state_cp = state.clone();
119        let handle = tokio::spawn(async move {
120            NoopReplicator::run(control_rx, data_rx, state_cp).await;
121        });
122
123        // Open
124        let (tx, rx) = tokio::sync::oneshot::channel();
125        control_tx
126            .send(ReplicatorControlEvent::Open {
127                mode: OpenMode::New,
128                reply: tx,
129            })
130            .await
131            .unwrap();
132        rx.await.unwrap().unwrap();
133
134        // ChangeRole to Primary
135        let (tx, rx) = tokio::sync::oneshot::channel();
136        control_tx
137            .send(ReplicatorControlEvent::ChangeRole {
138                epoch: Epoch::new(0, 1),
139                role: Role::Primary,
140                reply: tx,
141            })
142            .await
143            .unwrap();
144        rx.await.unwrap().unwrap();
145
146        // Runtime sets status after replicator confirms role change
147        runtime_set_status_for_role(&state, Role::Primary);
148
149        assert_eq!(state.read_status(), AccessStatus::Granted);
150        assert_eq!(state.write_status(), AccessStatus::Granted);
151
152        // Replicate
153        let (tx, rx) = tokio::sync::oneshot::channel();
154        data_tx
155            .send(ReplicateRequest {
156                data: bytes::Bytes::from("hello"),
157                reply: tx,
158            })
159            .await
160            .unwrap();
161        let lsn = rx.await.unwrap().unwrap();
162        assert_eq!(lsn, 1);
163        assert_eq!(state.current_progress(), 1);
164
165        // Replicate again
166        let (tx, rx) = tokio::sync::oneshot::channel();
167        data_tx
168            .send(ReplicateRequest {
169                data: bytes::Bytes::from("world"),
170                reply: tx,
171            })
172            .await
173            .unwrap();
174        let lsn = rx.await.unwrap().unwrap();
175        assert_eq!(lsn, 2);
176
177        // ChangeRole to Secondary
178        let (tx, rx) = tokio::sync::oneshot::channel();
179        control_tx
180            .send(ReplicatorControlEvent::ChangeRole {
181                epoch: Epoch::new(0, 2),
182                role: Role::ActiveSecondary,
183                reply: tx,
184            })
185            .await
186            .unwrap();
187        rx.await.unwrap().unwrap();
188
189        // Runtime sets status after replicator confirms role change
190        runtime_set_status_for_role(&state, Role::ActiveSecondary);
191
192        assert_eq!(state.read_status(), AccessStatus::NotPrimary);
193        assert_eq!(state.write_status(), AccessStatus::NotPrimary);
194
195        // Close
196        let (tx, rx) = tokio::sync::oneshot::channel();
197        control_tx
198            .send(ReplicatorControlEvent::Close { reply: tx })
199            .await
200            .unwrap();
201        rx.await.unwrap().unwrap();
202
203        handle.await.unwrap();
204    }
205
206    #[tokio::test]
207    async fn test_noop_replicate_handle() {
208        use crate::handles::StateReplicatorHandle;
209
210        let (control_tx, control_rx) = tokio::sync::mpsc::channel(16);
211        let (data_tx, data_rx) = tokio::sync::mpsc::channel::<ReplicateRequest>(16);
212        let state = Arc::new(PartitionState::new());
213
214        let replicator_handle = StateReplicatorHandle::new(data_tx.clone(), state.clone());
215
216        let state_cp = state.clone();
217        let handle = tokio::spawn(async move {
218            NoopReplicator::run(control_rx, data_rx, state_cp).await;
219        });
220
221        // Open + ChangeRole(Primary)
222        let (tx, rx) = tokio::sync::oneshot::channel();
223        control_tx
224            .send(ReplicatorControlEvent::Open {
225                mode: OpenMode::New,
226                reply: tx,
227            })
228            .await
229            .unwrap();
230        rx.await.unwrap().unwrap();
231
232        let (tx, rx) = tokio::sync::oneshot::channel();
233        control_tx
234            .send(ReplicatorControlEvent::ChangeRole {
235                epoch: Epoch::new(0, 1),
236                role: Role::Primary,
237                reply: tx,
238            })
239            .await
240            .unwrap();
241        rx.await.unwrap().unwrap();
242
243        // Runtime sets status after replicator confirms role change
244        runtime_set_status_for_role(&state, Role::Primary);
245
246        // Replicate via handle
247        let token = CancellationToken::new();
248        let lsn = replicator_handle
249            .replicate(bytes::Bytes::from("test"), token)
250            .await
251            .unwrap();
252        assert_eq!(lsn, 1);
253
254        // Close
255        let (tx, rx) = tokio::sync::oneshot::channel();
256        control_tx
257            .send(ReplicatorControlEvent::Close { reply: tx })
258            .await
259            .unwrap();
260        rx.await.unwrap().unwrap();
261
262        handle.await.unwrap();
263    }
264
265    #[tokio::test]
266    async fn test_noop_replicate_not_primary() {
267        use crate::handles::StateReplicatorHandle;
268
269        let (_control_tx, control_rx) = tokio::sync::mpsc::channel(16);
270        let (data_tx, data_rx) = tokio::sync::mpsc::channel::<ReplicateRequest>(16);
271        let state = Arc::new(PartitionState::new());
272
273        let replicator_handle = StateReplicatorHandle::new(data_tx.clone(), state.clone());
274
275        let state_cp = state.clone();
276        let _handle = tokio::spawn(async move {
277            NoopReplicator::run(control_rx, data_rx, state_cp).await;
278        });
279
280        // Don't promote to primary — status is NotPrimary
281        let token = CancellationToken::new();
282        let result = replicator_handle
283            .replicate(bytes::Bytes::from("test"), token)
284            .await;
285
286        assert!(matches!(result, Err(crate::KubericError::NotPrimary)));
287    }
288}